From 90d9963570d5fa19c91d6b9dc860ec0bf2b11705 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Sun, 19 Mar 2017 20:48:42 -0700 Subject: [PATCH] Converts the stats fetch from serial to parallel and snaps the last index. --- consul/autopilot.go | 39 +++++++++---- consul/server.go | 2 +- consul/stats_fetcher.go | 106 ++++++++++++++++++++-------------- consul/stats_fetcher_test.go | 109 +++++++++++++++++++++++++++++++++++ consul/structs/operator.go | 4 +- 5 files changed, 203 insertions(+), 57 deletions(-) create mode 100644 consul/stats_fetcher_test.go diff --git a/consul/autopilot.go b/consul/autopilot.go index e6746b7962..ba564a9926 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "strconv" "sync" @@ -245,10 +246,25 @@ func (s *Server) updateClusterHealth() error { if err := future.Error(); err != nil { return fmt.Errorf("error getting Raft configuration %s", err) } + servers := future.Configuration().Servers + + // Fetch the health for each of the servers in parallel so we get as + // consistent of a sample as possible. We capture the leader's index + // here as well so it roughly lines up with the same point in time. + targetLastIndex := s.raft.LastIndex() + var fetchList []*agent.Server + for _, server := range servers { + if parts, ok := serverMap[string(server.ID)]; ok { + fetchList = append(fetchList, parts) + } + } + d := time.Now().Add(s.config.ServerHealthInterval / 2) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() + fetchedStats := s.statsFetcher.Fetch(ctx, fetchList) // Build a current list of server healths var clusterHealth structs.OperatorHealthReply - servers := future.Configuration().Servers healthyCount := 0 voterCount := 0 for _, server := range servers { @@ -263,8 +279,10 @@ func (s *Server) updateClusterHealth() error { if ok { health.Name = parts.Name health.SerfStatus = parts.Status - if err := s.updateServerHealth(&health, parts, autopilotConf); err != nil { - s.logger.Printf("[WARN] consul: error getting server health: %s", err) + if stats, ok := fetchedStats[string(server.ID)]; ok { + if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil { + s.logger.Printf("[WARN] consul: error updating server health: %s", err) + } } } else { health.SerfStatus = serf.StatusNone @@ -304,18 +322,17 @@ func (s *Server) updateClusterHealth() error { return nil } -// updateServerHealth fetches the raft stats for the given server and uses them -// to update its ServerHealth -func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent.Server, autopilotConf *structs.AutopilotConfig) error { - stats, err := s.statsFetcher.Fetch(server, s.config.ServerHealthInterval/2) - if err != nil { - return fmt.Errorf("error getting raft stats for %q: %s", server.Name, err) - } +// updateServerHealth computes the resulting health of the server based on its +// fetched stats and the state of the leader. +func (s *Server) updateServerHealth(health *structs.ServerHealth, + server *agent.Server, stats *structs.ServerStats, + autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error { health.LastTerm = stats.LastTerm health.LastIndex = stats.LastIndex if stats.LastContact != "never" { + var err error health.LastContact, err = time.ParseDuration(stats.LastContact) if err != nil { return fmt.Errorf("error parsing last_contact duration: %s", err) @@ -326,7 +343,7 @@ func (s *Server) updateServerHealth(health *structs.ServerHealth, server *agent. if err != nil { return fmt.Errorf("error parsing last_log_term: %s", err) } - health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf) + health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf) // If this is a new server or the health changed, reset StableSince lastHealth := s.getServerHealth(server.ID) diff --git a/consul/server.go b/consul/server.go index 82f3597cf5..75a8ed7acb 100644 --- a/consul/server.go +++ b/consul/server.go @@ -260,7 +260,7 @@ func NewServer(config *Config) (*Server, error) { s.autopilotPolicy = &BasicAutopilot{s} // Initialize the stats fetcher that autopilot will use. - s.statsFetcher = NewStatsFetcher(s.shutdownCh, s.connPool, s.config.Datacenter) + s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) diff --git a/consul/stats_fetcher.go b/consul/stats_fetcher.go index ab7216d3aa..c3ab71e9cf 100644 --- a/consul/stats_fetcher.go +++ b/consul/stats_fetcher.go @@ -1,19 +1,23 @@ package consul import ( - "fmt" + "context" + "log" "sync" - "time" "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/structs" ) -// StatsFetcher makes sure there's only one in-flight request for stats at any -// given time, and allows us to have a timeout so the autopilot loop doesn't get -// blocked if there's a slow server. +// StatsFetcher has two functions for autopilot. First, lets us fetch all the +// stats in parallel so we are taking a sample as close to the same time as +// possible, since we are comparing time-sensitive info for the health check. +// Second, it bounds the time so that one slow RPC can't hold up the health +// check loop; as a side effect of how it implements this, it also limits to +// a single in-flight RPC to any given server, so goroutines don't accumulate +// as we run the health check fairly frequently. type StatsFetcher struct { - shutdownCh <-chan struct{} + logger *log.Logger pool *ConnPool datacenter string inflight map[string]struct{} @@ -21,54 +25,70 @@ type StatsFetcher struct { } // NewStatsFetcher returns a stats fetcher. -func NewStatsFetcher(shutdownCh <-chan struct{}, pool *ConnPool, datacenter string) *StatsFetcher { +func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher { return &StatsFetcher{ - shutdownCh: shutdownCh, + logger: logger, pool: pool, datacenter: datacenter, inflight: make(map[string]struct{}), } } -// Fetch will attempt to get the server health for up to the timeout, and will -// also return an error immediately if there is a request still outstanding. We -// throw away results from any outstanding requests since we don't want to -// ingest stale health data. -func (f *StatsFetcher) Fetch(server *agent.Server, timeout time.Duration) (*structs.ServerStats, error) { - // Don't allow another request if there's another one outstanding. - f.inflightLock.Lock() - if _, ok := f.inflight[server.ID]; ok { - f.inflightLock.Unlock() - return nil, fmt.Errorf("stats request already outstanding") +// fetch does the RPC to fetch the server stats from a single server. We don't +// cancel this when the context is canceled because we only want one in-flight +// RPC to each server, so we let it finish and then clean up the in-flight +// tracking. +func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) { + var args struct{} + var reply structs.ServerStats + err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + if err != nil { + f.logger.Printf("[WARN] consul: error getting server health from %q: %v", server.Name, err) + } else { + replyCh <- &reply + } + + f.inflightLock.Lock() + delete(f.inflight, server.ID) + f.inflightLock.Unlock() +} + +// Fetch will attempt to query all the servers in parallel. +func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats { + type workItem struct { + server *agent.Server + replyCh chan *structs.ServerStats + } + var work []*workItem + + // Skip any servers that have inflight requests. + f.inflightLock.Lock() + for _, server := range servers { + if _, ok := f.inflight[server.ID]; ok { + f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding", server.Name) + } else { + workItem := &workItem{ + server: server, + replyCh: make(chan *structs.ServerStats, 1), + } + work = append(work, workItem) + f.inflight[server.ID] = struct{}{} + go f.fetch(workItem.server, workItem.replyCh) + } } - f.inflight[server.ID] = struct{}{} f.inflightLock.Unlock() - // Make the request in a goroutine. - errCh := make(chan error, 1) - var reply structs.ServerStats - go func() { - var args struct{} - errCh <- f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + // Now wait for the results to come in, or for the context to be + // canceled. + replies := make(map[string]*structs.ServerStats) + for _, workItem := range work { + select { + case reply := <-workItem.replyCh: + replies[workItem.server.ID] = reply - f.inflightLock.Lock() - delete(f.inflight, server.ID) - f.inflightLock.Unlock() - }() - - // Wait for something to happen. - select { - case <-f.shutdownCh: - return nil, fmt.Errorf("shutdown") - - case err := <-errCh: - if err == nil { - return &reply, nil - } else { - return nil, err + case <-ctx.Done(): + // Give up on this and any remaining outstanding RPCs. } - - case <-time.After(timeout): - return nil, fmt.Errorf("timeout") } + return replies } diff --git a/consul/stats_fetcher_test.go b/consul/stats_fetcher_test.go new file mode 100644 index 0000000000..38d2673b93 --- /dev/null +++ b/consul/stats_fetcher_test.go @@ -0,0 +1,109 @@ +package consul + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/consul/types" +) + +func TestStatsFetcher(t *testing.T) { + dir1, s1 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCExpect(t, "dc1", 3) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, s1.RPC, "dc1") + + members := s1.serfLAN.Members() + if len(members) != 3 { + t.Fatalf("bad len: %d", len(members)) + } + + var servers []*agent.Server + for _, member := range members { + ok, server := agent.IsConsulServer(member) + if !ok { + t.Fatalf("bad: %#v", member) + } + servers = append(servers, server) + } + + // Do a normal fetch and make sure we get three responses. + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 3 { + t.Fatalf("bad: %#v", stats) + } + for id, stat := range stats { + switch types.NodeID(id) { + case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID: + // OK + default: + t.Fatalf("bad: %s", id) + } + + if stat == nil || stat.LastTerm == 0 { + t.Fatalf("bad: %#v", stat) + } + } + }() + + // Fake an in-flight request to server 3 and make sure we don't fetch + // from it. + func() { + s1.statsFetcher.inflight[string(s3.config.NodeID)] = struct{}{} + defer delete(s1.statsFetcher.inflight, string(s3.config.NodeID)) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 2 { + t.Fatalf("bad: %#v", stats) + } + for id, stat := range stats { + switch types.NodeID(id) { + case s1.config.NodeID, s2.config.NodeID: + // OK + case s3.config.NodeID: + t.Fatalf("bad") + default: + t.Fatalf("bad: %s", id) + } + + if stat == nil || stat.LastTerm == 0 { + t.Fatalf("bad: %#v", stat) + } + } + }() + + // Do a fetch with a canceled context and make sure we bail right away. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + cancel() + stats := s1.statsFetcher.Fetch(ctx, servers) + if len(stats) != 0 { + t.Fatalf("bad: %#v", stats) + } +} diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 619accaeee..8ebb5a8eb1 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -139,7 +139,7 @@ type ServerHealth struct { // IsHealthy determines whether this ServerHealth is considered healthy // based on the given Autopilot config -func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool { +func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool { if h.SerfStatus != serf.StatusAlive { return false } @@ -152,7 +152,7 @@ func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotCon return false } - if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs { + if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs { return false }