mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 06:44:41 +00:00
Merge pull request #2805 from hashicorp/stats-fetcher
Adds a stats fetcher to make sure we don't block the autopilot loop.
This commit is contained in:
commit
aabd802c06
@ -34,6 +34,7 @@ type Server struct {
|
|||||||
Version int
|
Version int
|
||||||
RaftVersion int
|
RaftVersion int
|
||||||
Addr net.Addr
|
Addr net.Addr
|
||||||
|
Status serf.MemberStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key returns the corresponding Key
|
// Key returns the corresponding Key
|
||||||
@ -104,6 +105,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
|
|||||||
Addr: addr,
|
Addr: addr,
|
||||||
Version: vsn,
|
Version: vsn,
|
||||||
RaftVersion: raft_vsn,
|
RaftVersion: raft_vsn,
|
||||||
|
Status: m.Status,
|
||||||
}
|
}
|
||||||
return true, parts
|
return true, parts
|
||||||
}
|
}
|
||||||
|
@ -62,6 +62,7 @@ func TestIsConsulServer(t *testing.T) {
|
|||||||
"vsn": "1",
|
"vsn": "1",
|
||||||
"raft_vsn": "3",
|
"raft_vsn": "3",
|
||||||
},
|
},
|
||||||
|
Status: serf.StatusLeft,
|
||||||
}
|
}
|
||||||
ok, parts := agent.IsConsulServer(m)
|
ok, parts := agent.IsConsulServer(m)
|
||||||
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
||||||
@ -82,6 +83,9 @@ func TestIsConsulServer(t *testing.T) {
|
|||||||
if parts.RaftVersion != 3 {
|
if parts.RaftVersion != 3 {
|
||||||
t.Fatalf("bad: %v", parts.RaftVersion)
|
t.Fatalf("bad: %v", parts.RaftVersion)
|
||||||
}
|
}
|
||||||
|
if parts.Status != serf.StatusLeft {
|
||||||
|
t.Fatalf("bad: %v", parts.Status)
|
||||||
|
}
|
||||||
m.Tags["bootstrap"] = "1"
|
m.Tags["bootstrap"] = "1"
|
||||||
m.Tags["disabled"] = "1"
|
m.Tags["disabled"] = "1"
|
||||||
ok, parts = agent.IsConsulServer(m)
|
ok, parts = agent.IsConsulServer(m)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
@ -229,7 +230,7 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the the serf members which are Consul servers
|
// Get the the serf members which are Consul servers
|
||||||
serverMap := make(map[string]serf.Member)
|
serverMap := make(map[string]*agent.Server)
|
||||||
for _, member := range s.LANMembers() {
|
for _, member := range s.LANMembers() {
|
||||||
if member.Status == serf.StatusLeft {
|
if member.Status == serf.StatusLeft {
|
||||||
continue
|
continue
|
||||||
@ -237,7 +238,7 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
|
|
||||||
valid, parts := agent.IsConsulServer(member)
|
valid, parts := agent.IsConsulServer(member)
|
||||||
if valid {
|
if valid {
|
||||||
serverMap[parts.ID] = member
|
serverMap[parts.ID] = parts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,10 +246,25 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
if err := future.Error(); err != nil {
|
if err := future.Error(); err != nil {
|
||||||
return fmt.Errorf("error getting Raft configuration %s", err)
|
return fmt.Errorf("error getting Raft configuration %s", err)
|
||||||
}
|
}
|
||||||
|
servers := future.Configuration().Servers
|
||||||
|
|
||||||
|
// Fetch the health for each of the servers in parallel so we get as
|
||||||
|
// consistent of a sample as possible. We capture the leader's index
|
||||||
|
// here as well so it roughly lines up with the same point in time.
|
||||||
|
targetLastIndex := s.raft.LastIndex()
|
||||||
|
var fetchList []*agent.Server
|
||||||
|
for _, server := range servers {
|
||||||
|
if parts, ok := serverMap[string(server.ID)]; ok {
|
||||||
|
fetchList = append(fetchList, parts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
d := time.Now().Add(s.config.ServerHealthInterval / 2)
|
||||||
|
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||||
|
defer cancel()
|
||||||
|
fetchedStats := s.statsFetcher.Fetch(ctx, fetchList)
|
||||||
|
|
||||||
// Build a current list of server healths
|
// Build a current list of server healths
|
||||||
var clusterHealth structs.OperatorHealthReply
|
var clusterHealth structs.OperatorHealthReply
|
||||||
servers := future.Configuration().Servers
|
|
||||||
healthyCount := 0
|
healthyCount := 0
|
||||||
voterCount := 0
|
voterCount := 0
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
@ -259,12 +275,14 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
Voter: server.Suffrage == raft.Voter,
|
Voter: server.Suffrage == raft.Voter,
|
||||||
}
|
}
|
||||||
|
|
||||||
member, ok := serverMap[string(server.ID)]
|
parts, ok := serverMap[string(server.ID)]
|
||||||
if ok {
|
if ok {
|
||||||
health.Name = member.Name
|
health.Name = parts.Name
|
||||||
health.SerfStatus = member.Status
|
health.SerfStatus = parts.Status
|
||||||
if err := s.updateServerHealth(&health, member, autopilotConf); err != nil {
|
if stats, ok := fetchedStats[string(server.ID)]; ok {
|
||||||
s.logger.Printf("[ERR] consul: error getting server health: %s", err)
|
if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
|
||||||
|
s.logger.Printf("[WARN] consul: error updating server health: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
health.SerfStatus = serf.StatusNone
|
health.SerfStatus = serf.StatusNone
|
||||||
@ -304,20 +322,17 @@ func (s *Server) updateClusterHealth() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateServerHealth fetches the raft stats for the given server and uses them
|
// updateServerHealth computes the resulting health of the server based on its
|
||||||
// to update its ServerHealth
|
// fetched stats and the state of the leader.
|
||||||
func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Member, autopilotConf *structs.AutopilotConfig) error {
|
func (s *Server) updateServerHealth(health *structs.ServerHealth,
|
||||||
_, server := agent.IsConsulServer(member)
|
server *agent.Server, stats *structs.ServerStats,
|
||||||
|
autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
|
||||||
stats, err := s.getServerStats(server)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error getting raft stats: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
health.LastTerm = stats.LastTerm
|
health.LastTerm = stats.LastTerm
|
||||||
health.LastIndex = stats.LastIndex
|
health.LastIndex = stats.LastIndex
|
||||||
|
|
||||||
if stats.LastContact != "never" {
|
if stats.LastContact != "never" {
|
||||||
|
var err error
|
||||||
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
health.LastContact, err = time.ParseDuration(stats.LastContact)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
return fmt.Errorf("error parsing last_contact duration: %s", err)
|
||||||
@ -328,7 +343,7 @@ func (s *Server) updateServerHealth(health *structs.ServerHealth, member serf.Me
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing last_log_term: %s", err)
|
return fmt.Errorf("error parsing last_log_term: %s", err)
|
||||||
}
|
}
|
||||||
health.Healthy = health.IsHealthy(lastTerm, s.raft.LastIndex(), autopilotConf)
|
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
|
||||||
|
|
||||||
// If this is a new server or the health changed, reset StableSince
|
// If this is a new server or the health changed, reset StableSince
|
||||||
lastHealth := s.getServerHealth(server.ID)
|
lastHealth := s.getServerHealth(server.ID)
|
||||||
@ -357,10 +372,3 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) {
|
|
||||||
var args struct{}
|
|
||||||
var reply structs.ServerStats
|
|
||||||
err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
|
|
||||||
return reply, err
|
|
||||||
}
|
|
||||||
|
@ -161,6 +161,10 @@ type Server struct {
|
|||||||
sessionTimers map[string]*time.Timer
|
sessionTimers map[string]*time.Timer
|
||||||
sessionTimersLock sync.Mutex
|
sessionTimersLock sync.Mutex
|
||||||
|
|
||||||
|
// statsFetcher is used by autopilot to check the status of the other
|
||||||
|
// Consul servers.
|
||||||
|
statsFetcher *StatsFetcher
|
||||||
|
|
||||||
// tombstoneGC is used to track the pending GC invocations
|
// tombstoneGC is used to track the pending GC invocations
|
||||||
// for the KV tombstones
|
// for the KV tombstones
|
||||||
tombstoneGC *state.TombstoneGC
|
tombstoneGC *state.TombstoneGC
|
||||||
@ -255,6 +259,9 @@ func NewServer(config *Config) (*Server, error) {
|
|||||||
}
|
}
|
||||||
s.autopilotPolicy = &BasicAutopilot{s}
|
s.autopilotPolicy = &BasicAutopilot{s}
|
||||||
|
|
||||||
|
// Initialize the stats fetcher that autopilot will use.
|
||||||
|
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
|
||||||
|
|
||||||
// Initialize the authoritative ACL cache.
|
// Initialize the authoritative ACL cache.
|
||||||
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
|
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
97
consul/stats_fetcher.go
Normal file
97
consul/stats_fetcher.go
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
||||||
|
// stats in parallel so we are taking a sample as close to the same time as
|
||||||
|
// possible, since we are comparing time-sensitive info for the health check.
|
||||||
|
// Second, it bounds the time so that one slow RPC can't hold up the health
|
||||||
|
// check loop; as a side effect of how it implements this, it also limits to
|
||||||
|
// a single in-flight RPC to any given server, so goroutines don't accumulate
|
||||||
|
// as we run the health check fairly frequently.
|
||||||
|
type StatsFetcher struct {
|
||||||
|
logger *log.Logger
|
||||||
|
pool *ConnPool
|
||||||
|
datacenter string
|
||||||
|
inflight map[string]struct{}
|
||||||
|
inflightLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStatsFetcher returns a stats fetcher.
|
||||||
|
func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher {
|
||||||
|
return &StatsFetcher{
|
||||||
|
logger: logger,
|
||||||
|
pool: pool,
|
||||||
|
datacenter: datacenter,
|
||||||
|
inflight: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch does the RPC to fetch the server stats from a single server. We don't
|
||||||
|
// cancel this when the context is canceled because we only want one in-flight
|
||||||
|
// RPC to each server, so we let it finish and then clean up the in-flight
|
||||||
|
// tracking.
|
||||||
|
func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) {
|
||||||
|
var args struct{}
|
||||||
|
var reply structs.ServerStats
|
||||||
|
err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
|
||||||
|
if err != nil {
|
||||||
|
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
|
||||||
|
server.Name, err)
|
||||||
|
} else {
|
||||||
|
replyCh <- &reply
|
||||||
|
}
|
||||||
|
|
||||||
|
f.inflightLock.Lock()
|
||||||
|
delete(f.inflight, server.ID)
|
||||||
|
f.inflightLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch will attempt to query all the servers in parallel.
|
||||||
|
func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats {
|
||||||
|
type workItem struct {
|
||||||
|
server *agent.Server
|
||||||
|
replyCh chan *structs.ServerStats
|
||||||
|
}
|
||||||
|
var work []*workItem
|
||||||
|
|
||||||
|
// Skip any servers that have inflight requests.
|
||||||
|
f.inflightLock.Lock()
|
||||||
|
for _, server := range servers {
|
||||||
|
if _, ok := f.inflight[server.ID]; ok {
|
||||||
|
f.logger.Printf("[WARN] consul: error getting server health from %q: last request still outstanding",
|
||||||
|
server.Name)
|
||||||
|
} else {
|
||||||
|
workItem := &workItem{
|
||||||
|
server: server,
|
||||||
|
replyCh: make(chan *structs.ServerStats, 1),
|
||||||
|
}
|
||||||
|
work = append(work, workItem)
|
||||||
|
f.inflight[server.ID] = struct{}{}
|
||||||
|
go f.fetch(workItem.server, workItem.replyCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f.inflightLock.Unlock()
|
||||||
|
|
||||||
|
// Now wait for the results to come in, or for the context to be
|
||||||
|
// canceled.
|
||||||
|
replies := make(map[string]*structs.ServerStats)
|
||||||
|
for _, workItem := range work {
|
||||||
|
select {
|
||||||
|
case reply := <-workItem.replyCh:
|
||||||
|
replies[workItem.server.ID] = reply
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
f.logger.Printf("[WARN] consul: error getting server health from %q: %v",
|
||||||
|
workItem.server.Name, ctx.Err())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return replies
|
||||||
|
}
|
109
consul/stats_fetcher_test.go
Normal file
109
consul/stats_fetcher_test.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/testutil"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStatsFetcher(t *testing.T) {
|
||||||
|
dir1, s1 := testServerDCExpect(t, "dc1", 3)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
|
||||||
|
dir2, s2 := testServerDCExpect(t, "dc1", 3)
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
|
||||||
|
dir3, s3 := testServerDCExpect(t, "dc1", 3)
|
||||||
|
defer os.RemoveAll(dir3)
|
||||||
|
defer s3.Shutdown()
|
||||||
|
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
members := s1.serfLAN.Members()
|
||||||
|
if len(members) != 3 {
|
||||||
|
t.Fatalf("bad len: %d", len(members))
|
||||||
|
}
|
||||||
|
|
||||||
|
var servers []*agent.Server
|
||||||
|
for _, member := range members {
|
||||||
|
ok, server := agent.IsConsulServer(member)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("bad: %#v", member)
|
||||||
|
}
|
||||||
|
servers = append(servers, server)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do a normal fetch and make sure we get three responses.
|
||||||
|
func() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
stats := s1.statsFetcher.Fetch(ctx, servers)
|
||||||
|
if len(stats) != 3 {
|
||||||
|
t.Fatalf("bad: %#v", stats)
|
||||||
|
}
|
||||||
|
for id, stat := range stats {
|
||||||
|
switch types.NodeID(id) {
|
||||||
|
case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID:
|
||||||
|
// OK
|
||||||
|
default:
|
||||||
|
t.Fatalf("bad: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat == nil || stat.LastTerm == 0 {
|
||||||
|
t.Fatalf("bad: %#v", stat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Fake an in-flight request to server 3 and make sure we don't fetch
|
||||||
|
// from it.
|
||||||
|
func() {
|
||||||
|
s1.statsFetcher.inflight[string(s3.config.NodeID)] = struct{}{}
|
||||||
|
defer delete(s1.statsFetcher.inflight, string(s3.config.NodeID))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
stats := s1.statsFetcher.Fetch(ctx, servers)
|
||||||
|
if len(stats) != 2 {
|
||||||
|
t.Fatalf("bad: %#v", stats)
|
||||||
|
}
|
||||||
|
for id, stat := range stats {
|
||||||
|
switch types.NodeID(id) {
|
||||||
|
case s1.config.NodeID, s2.config.NodeID:
|
||||||
|
// OK
|
||||||
|
case s3.config.NodeID:
|
||||||
|
t.Fatalf("bad")
|
||||||
|
default:
|
||||||
|
t.Fatalf("bad: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat == nil || stat.LastTerm == 0 {
|
||||||
|
t.Fatalf("bad: %#v", stat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Do a fetch with a canceled context and make sure we bail right away.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
cancel()
|
||||||
|
stats := s1.statsFetcher.Fetch(ctx, servers)
|
||||||
|
if len(stats) != 0 {
|
||||||
|
t.Fatalf("bad: %#v", stats)
|
||||||
|
}
|
||||||
|
}
|
@ -139,7 +139,7 @@ type ServerHealth struct {
|
|||||||
|
|
||||||
// IsHealthy determines whether this ServerHealth is considered healthy
|
// IsHealthy determines whether this ServerHealth is considered healthy
|
||||||
// based on the given Autopilot config
|
// based on the given Autopilot config
|
||||||
func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotConf *AutopilotConfig) bool {
|
func (h *ServerHealth) IsHealthy(lastTerm uint64, leaderLastIndex uint64, autopilotConf *AutopilotConfig) bool {
|
||||||
if h.SerfStatus != serf.StatusAlive {
|
if h.SerfStatus != serf.StatusAlive {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -152,7 +152,7 @@ func (h *ServerHealth) IsHealthy(lastTerm uint64, lastIndex uint64, autopilotCon
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if lastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < lastIndex-autopilotConf.MaxTrailingLogs {
|
if leaderLastIndex > autopilotConf.MaxTrailingLogs && h.LastIndex < leaderLastIndex-autopilotConf.MaxTrailingLogs {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user