Converts the stats fetch from serial to parallel and snaps the last index.

This commit is contained in:
James Phillips 2017-03-19 20:48:42 -07:00
parent 867421b7d3
commit 90d9963570
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
5 changed files with 203 additions and 57 deletions

View File

@ -1,6 +1,7 @@
package consul
import (
"context"
"fmt"
"strconv"
"sync"
@ -245,10 +246,25 @@ func (s *Server) updateClusterHealth() error {
if err := future.Error(); err != nil {
return fmt.Errorf("error getting Raft configuration %s", err)
}
servers := future.Configuration().Servers
// Fetch the health for each of the servers in parallel so we get as
// consistent of a sample as possible. We capture the leader's index
// here as well so it roughly lines up with the same point in time.
targetLastIndex := s.raft.LastIndex()
var fetchList []*agent.Server
for _, server := range servers {
if parts, ok := serverMap[string(server.ID)]; ok {
fetchList = append(fetchList, parts)
}
}
d := time.Now().Add(s.config.ServerHealthInterval / 2)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
fetchedStats := s.statsFetcher.Fetch(ctx, fetchList)
// Build a current list of server healths
var clusterHealth structs.OperatorHealthReply
servers := future.Configuration().Servers
healthyCount := 0
voterCount := 0
for _, server := range servers {
@ -263,8 +279,10 @@ func (s *Server) updateClusterHealth() error {
if ok {
health.Name = parts.Name
health.SerfStatus = parts.Status
if err := s.updateServerHealth(&health, parts, autopilotConf); err != nil {
s.logger.Printf("[WARN] consul: error getting server health: %s", err)
if stats, ok := fetchedStats[string(server.ID)]; ok {
if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
s.logger.Printf("[WARN] consul: error updating server health: %s", err)
}
}
} else {
health.SerfStatus = serf.StatusNone
@ -304,18 +322,17 @@ func (s *Server) updateClusterHealth() error {
return nil
}
// updateServerHealth fetches the raft stats for the given server and uses them
// to update its ServerHealth
func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.Server, autopilotConf *structs.AutopilotConfig) error {
stats, err := s.statsFetcher.Fetch(server, s.config.ServerHealthInterval/2)
if err != nil {
return fmt.Errorf("error getting raft stats for %q: %s", server.Name, err)
}
// updateServerHealth computes the resulting health of the server based on its
// fetched stats and the state of the leader.
func (s *Server) updateServerHealth(health *structs.ServerHealth,
server *agent.Server, stats *structs.ServerStats,
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
health.LastTerm = stats.LastTerm
health.LastIndex = stats.LastIndex
if stats.LastContact != "never" {
var err error
health.LastContact, err = time.ParseDuration(stats.LastContact)
if err != nil {
return fmt.Errorf("error parsing last_contact duration: %s", err)
@ -326,7 +343,7 @@ func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.
if err != nil {
return fmt.Errorf("error parsing last_log_term: %s", err)
}
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
// If this is a new server or the health changed, reset StableSince
lastHealth := s.getServerHealth(server.ID)

View File

@ -260,7 +260,7 @@ func NewServer(config *Config) (*Server, error) {
s.autopilotPolicy = &BasicAutopilot{s}
// Initialize the stats fetcher that autopilot will use.
s.statsFetcher = NewStatsFetcher(s.shutdownCh, s.connPool, s.config.Datacenter)
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
// Initialize the authoritative ACL cache.
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)

View File

@ -1,19 +1,23 @@
package consul
import (
"fmt"
"context"
"log"
"sync"
"time"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs"
)
// StatsFetcher makes sure there's only one in-flight request for stats at any
// given time, and allows us to have a timeout so the autopilot loop doesn't get
// blocked if there's a slow server.
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
// stats in parallel so we are taking a sample as close to the same time as
// possible, since we are comparing time-sensitive info for the health check.
// Second, it bounds the time so that one slow RPC can't hold up the health
// check loop; as a side effect of how it implements this, it also limits to
// a single in-flight RPC to any given server, so goroutines don't accumulate
// as we run the health check fairly frequently.
type StatsFetcher struct {
shutdownCh <-chan struct{}
logger *log.Logger
pool *ConnPool
datacenter string
inflight map[string]struct{}
@ -21,54 +25,70 @@ type StatsFetcher struct {
}
// NewStatsFetcher returns a stats fetcher.
func NewStatsFetcher(shutdownCh <-chan struct{}, pool *ConnPool, datacenter string) *StatsFetcher {
func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher {
return &StatsFetcher{
shutdownCh: shutdownCh,
logger: logger,
pool: pool,
datacenter: datacenter,
inflight: make(map[string]struct{}),
}
}
// Fetch will attempt to get the server health for up to the timeout, and will
// also return an error immediately if there is a request still outstanding. We
// throw away results from any outstanding requests since we don't want to
// ingest stale health data.
func (f *StatsFetcher) Fetch(server *agent.Server, timeout time.Duration) (*structs.ServerStats, error) {
// Don't allow another request if there's another one outstanding.
f.inflightLock.Lock()
if _, ok := f.inflight[server.ID]; ok {
f.inflightLock.Unlock()
return nil, fmt.Errorf("stats request already outstanding")
// fetch does the RPC to fetch the server stats from a single server. We don't
// cancel this when the context is canceled because we only want one in-flight
// RPC to each server, so we let it finish and then clean up the in-flight
// tracking.
func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) {
var args struct{}
var reply structs.ServerStats
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Printf("[WARN] consul: error getting server health from %q: %v", server.Name, err)
} else {
replyCh <- &reply
}
f.inflightLock.Lock()
delete(f.inflight, server.ID)
f.inflightLock.Unlock()
}
// Fetch will attempt to query all the servers in parallel.
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats {
type workItem struct {
server *agent.Server
replyCh chan *structs.ServerStats
}
var work []*workItem
// Skip any servers that have inflight requests.
f.inflightLock.Lock()
for _, server := range servers {
if _, ok := f.inflight[server.ID]; ok {
f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding", server.Name)
} else {
workItem := &workItem{
server: server,
replyCh: make(chan *structs.ServerStats, 1),
}
work = append(work, workItem)
f.inflight[server.ID] = struct{}{}
go f.fetch(workItem.server, workItem.replyCh)
}
}
f.inflight[server.ID] = struct{}{}
f.inflightLock.Unlock()
// Make the request in a goroutine.
errCh := make(chan error, 1)
var reply structs.ServerStats
go func() {
var args struct{}
errCh <- f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
// Now wait for the results to come in, or for the context to be
// canceled.
replies := make(map[string]*structs.ServerStats)
for _, workItem := range work {
select {
case reply := <-workItem.replyCh:
replies[workItem.server.ID] = reply
f.inflightLock.Lock()
delete(f.inflight, server.ID)
f.inflightLock.Unlock()
}()
// Wait for something to happen.
select {
case <-f.shutdownCh:
return nil, fmt.Errorf("shutdown")
case err := <-errCh:
if err == nil {
return &reply, nil
} else {
return nil, err
case <-ctx.Done():
// Give up on this and any remaining outstanding RPCs.
}
case <-time.After(timeout):
return nil, fmt.Errorf("timeout")
}
return replies
}

View File

@ -0,0 +1,109 @@
package consul
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
)
func TestStatsFetcher(t *testing.T) {
dir1, s1 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCExpect(t, "dc1", 3)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
members := s1.serfLAN.Members()
if len(members) != 3 {
t.Fatalf("bad len: %d", len(members))
}
var servers []*agent.Server
for _, member := range members {
ok, server := agent.IsConsulServer(member)
if !ok {
t.Fatalf("bad: %#v", member)
}
servers = append(servers, server)
}
// Do a normal fetch and make sure we get three responses.
func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
if len(stats) != 3 {
t.Fatalf("bad: %#v", stats)
}
for id, stat := range stats {
switch types.NodeID(id) {
case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID:
// OK
default:
t.Fatalf("bad: %s", id)
}
if stat == nil || stat.LastTerm == 0 {
t.Fatalf("bad: %#v", stat)
}
}
}()
// Fake an in-flight request to server 3 and make sure we don't fetch
// from it.
func() {
s1.statsFetcher.inflight[string(s3.config.NodeID)] = struct{}{}
defer delete(s1.statsFetcher.inflight, string(s3.config.NodeID))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
if len(stats) != 2 {
t.Fatalf("bad: %#v", stats)
}
for id, stat := range stats {
switch types.NodeID(id) {
case s1.config.NodeID, s2.config.NodeID:
// OK
case s3.config.NodeID:
t.Fatalf("bad")
default:
t.Fatalf("bad: %s", id)
}
if stat == nil || stat.LastTerm == 0 {
t.Fatalf("bad: %#v", stat)
}
}
}()
// Do a fetch with a canceled context and make sure we bail right away.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
cancel()
stats := s1.statsFetcher.Fetch(ctx, servers)
if len(stats) != 0 {
t.Fatalf("bad: %#v", stats)
}
}

View File

@ -139,7 +139,7 @@ type ServerHealth struct {
// IsHealthy determines whether this ServerHealth is considered healthy
// based on the given Autopilot config
func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool {
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool {
if h.SerfStatus != serf.StatusAlive {
return false
}
@ -152,7 +152,7 @@ func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotCon
return false
}
if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs {
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
return false
}