peering: add peer health metric (#14004)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-08-25 16:32:59 -07:00 committed by GitHub
parent fead3c537b
commit 30ff2e9a35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 262 additions and 47 deletions

View File

@ -31,11 +31,18 @@ import (
)
var leaderExportedServicesCountKey = []string{"consul", "peering", "exported_services"}
var leaderHealthyPeeringKey = []string{"consul", "peering", "healthy"}
var LeaderPeeringMetrics = []prometheus.GaugeDefinition{
{
Name: leaderExportedServicesCountKey,
Help: "A gauge that tracks how many services are exported for the peering. " +
"The labels are \"peering\" and, for enterprise, \"partition\". " +
"The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
{
Name: leaderHealthyPeeringKey,
Help: "A gauge that tracks how if a peering is healthy (1) or not (0). " +
"The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
}
@ -85,13 +92,6 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric
}
for _, peer := range peers {
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if !found {
logger.Trace("did not find status for", "peer_name", peer.Name)
continue
}
esc := status.GetExportedServicesCount()
part := peer.Partition
labels := []metrics.Label{
{Name: "peer_name", Value: peer.Name},
@ -101,9 +101,27 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric
labels = append(labels, metrics.Label{Name: "partition", Value: part})
}
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found {
// exported services count metric
esc := status.GetExportedServicesCount()
metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels)
}
// peering health metric
if status.NeverConnected {
metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(math.NaN()), labels)
} else {
healthy := status.IsHealthy()
healthyInt := 0
if healthy {
healthyInt = 1
}
metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(healthyInt), labels)
}
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math"
"testing"
"time"
@ -974,6 +975,7 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
var (
s2PeerID1 = generateUUID()
s2PeerID2 = generateUUID()
s2PeerID3 = generateUUID()
testContextTimeout = 60 * time.Second
lastIdx = uint64(0)
)
@ -1063,6 +1065,24 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
// mimic tracking exported services
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})
// pretend that the hearbeat happened
mst2.TrackRecvHeartbeat()
}
// Simulate a peering that never connects
{
p3 := &pbpeering.Peering{
ID: s2PeerID3,
Name: "my-peer-s4",
PeerID: token.PeerID, // doesn't much matter what these values are
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p3.ShouldDial())
lastIdx++
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p3}))
}
// set up a metrics sink
@ -1092,6 +1112,18 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric2))
require.Equal(r, float32(2), metric2.Value) // for d, e services
keyHealthyMetric2 := fmt.Sprintf("us-west.consul.peering.healthy;peer_name=my-peer-s3;peer_id=%s", s2PeerID2)
healthyMetric2, ok := intv.Gauges[keyHealthyMetric2]
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyHealthyMetric2))
require.Equal(r, float32(1), healthyMetric2.Value)
keyHealthyMetric3 := fmt.Sprintf("us-west.consul.peering.healthy;peer_name=my-peer-s4;peer_id=%s", s2PeerID3)
healthyMetric3, ok := intv.Gauges[keyHealthyMetric3]
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyHealthyMetric3))
require.True(r, math.IsNaN(float64(healthyMetric3.Value)))
})
}

View File

@ -742,6 +742,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
})
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
s.peerStreamServer.Register(s.externalGRPCServer)
// Initialize internal gRPC server.

View File

@ -42,8 +42,8 @@ type Config struct {
// outgoingHeartbeatInterval is how often we send a heartbeat.
outgoingHeartbeatInterval time.Duration
// incomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
incomingHeartbeatTimeout time.Duration
// IncomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
IncomingHeartbeatTimeout time.Duration
}
//go:generate mockery --name ACLResolver --inpackage
@ -63,8 +63,8 @@ func NewServer(cfg Config) *Server {
if cfg.outgoingHeartbeatInterval == 0 {
cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval
}
if cfg.incomingHeartbeatTimeout == 0 {
cfg.incomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
if cfg.IncomingHeartbeatTimeout == 0 {
cfg.IncomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
}
return &Server{
Config: cfg,

View File

@ -406,7 +406,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// incomingHeartbeatCtx will complete if incoming heartbeats time out.
incomingHeartbeatCtx, incomingHeartbeatCtxCancel :=
context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout)
// NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're
// re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned
// value, not the current value.
@ -605,7 +605,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// They just can't trace the execution properly for some reason (possibly golang/go#29587).
//nolint:govet
incomingHeartbeatCtx, incomingHeartbeatCtxCancel =
context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout)
}
case update := <-subCh:
@ -642,6 +642,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
if err := streamSend(replResp); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}
status.TrackSendSuccess()
}
}
}

View File

@ -572,7 +572,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
})
var lastSendSuccess time.Time
var lastSendAck, lastSendSuccess time.Time
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
ack := &pbpeerstream.ReplicationMessage{
@ -587,19 +587,22 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
}
lastSendSuccess = it.FutureNow(1)
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 2, 0, time.UTC)
lastSendSuccess = time.Date(2000, time.January, 1, 0, 0, 3, 0, time.UTC)
err := client.Send(ack)
require.NoError(t, err)
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(testPeerID)
rStatus, ok := srv.StreamStatus(testPeerID)
require.True(r, ok)
require.Equal(r, expect, status)
require.Equal(r, expect, rStatus)
})
})
@ -621,7 +624,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
}
lastNack = it.FutureNow(1)
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 4, 0, time.UTC)
lastNack = time.Date(2000, time.January, 1, 0, 0, 5, 0, time.UTC)
err := client.Send(nack)
require.NoError(t, err)
@ -629,15 +633,17 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(testPeerID)
rStatus, ok := srv.StreamStatus(testPeerID)
require.True(r, ok)
require.Equal(r, expect, status)
require.Equal(r, expect, rStatus)
})
})
@ -694,13 +700,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -753,7 +761,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
@ -762,6 +770,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -785,7 +795,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastRecvResourceSuccess: lastRecvResourceSuccess,
@ -795,6 +805,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -816,7 +828,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: false,
DisconnectErrorMessage: lastRecvErrorMsg,
LastAck: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime,
@ -827,6 +839,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{
api.String(): {},
},
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
}
retry.Run(t, func(r *retry.R) {
@ -1129,7 +1143,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.incomingHeartbeatTimeout = 5 * time.Millisecond
c.IncomingHeartbeatTimeout = 5 * time.Millisecond
})
p := writePeeringToBeDialed(t, store, 1, "my-peer")
@ -1236,7 +1250,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now)
c.incomingHeartbeatTimeout = incomingHeartbeatTimeout
c.IncomingHeartbeatTimeout = incomingHeartbeatTimeout
})
p := writePeeringToBeDialed(t, store, 1, "my-peer")

View File

@ -16,6 +16,8 @@ type Tracker struct {
// timeNow is a shim for testing.
timeNow func() time.Time
heartbeatTimeout time.Duration
}
func NewTracker() *Tracker {
@ -33,6 +35,12 @@ func (t *Tracker) SetClock(clock func() time.Time) {
}
}
func (t *Tracker) SetHeartbeatTimeout(heartbeatTimeout time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()
t.heartbeatTimeout = heartbeatTimeout
}
// Register a stream for a given peer but do not mark it as connected.
func (t *Tracker) Register(id string) (*MutableStatus, error) {
t.mu.Lock()
@ -44,7 +52,7 @@ func (t *Tracker) Register(id string) (*MutableStatus, error) {
func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) {
status, ok := t.streams[id]
if !ok {
status = newMutableStatus(t.timeNow, initAsConnected)
status = newMutableStatus(t.timeNow, t.heartbeatTimeout, initAsConnected)
t.streams[id] = status
return status, true, nil
}
@ -101,7 +109,9 @@ func (t *Tracker) StreamStatus(id string) (resp Status, found bool) {
s, ok := t.streams[id]
if !ok {
return Status{}, false
return Status{
NeverConnected: true,
}, false
}
return s.GetStatus(), true
}
@ -142,9 +152,14 @@ type MutableStatus struct {
// Status contains information about the replication stream to a peer cluster.
// TODO(peering): There's a lot of fields here...
type Status struct {
heartbeatTimeout time.Duration
// Connected is true when there is an open stream for the peer.
Connected bool
// NeverConnected is true for peerings that have never connected, false otherwise.
NeverConnected bool
// DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully.
// If the stream is connected or it disconnected gracefully it will be empty.
DisconnectErrorMessage string
@ -167,6 +182,9 @@ type Status struct {
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastSendSuccess tracks the time of the last success response sent into the stream.
LastSendSuccess time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat time.Time
@ -196,10 +214,40 @@ func (s *Status) GetExportedServicesCount() uint64 {
return uint64(len(s.ExportedServices))
}
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
// IsHealthy is a convenience func that returns true/ false for a peering status.
// We define a peering as unhealthy if its status satisfies one of the following:
// - If heartbeat hasn't been received within the IncomingHeartbeatTimeout
// - If the last sent error is newer than last sent success
// - If the last received error is newer than last received success
// If none of these conditions apply, we call the peering healthy.
func (s *Status) IsHealthy() bool {
if time.Now().Sub(s.LastRecvHeartbeat) > s.heartbeatTimeout {
// 1. If heartbeat hasn't been received for a while - report unhealthy
return false
}
if s.LastSendError.After(s.LastSendSuccess) {
// 2. If last sent error is newer than last sent success - report unhealthy
return false
}
if s.LastRecvError.After(s.LastRecvResourceSuccess) {
// 3. If last recv error is newer than last recv success - report unhealthy
return false
}
return true
}
func newMutableStatus(now func() time.Time, heartbeatTimeout time.Duration, connected bool) *MutableStatus {
if heartbeatTimeout.Microseconds() == 0 {
heartbeatTimeout = defaultIncomingHeartbeatTimeout
}
return &MutableStatus{
Status: Status{
Connected: connected,
heartbeatTimeout: heartbeatTimeout,
NeverConnected: !connected,
},
timeNow: now,
doneCh: make(chan struct{}),
@ -223,6 +271,12 @@ func (s *MutableStatus) TrackSendError(error string) {
s.mu.Unlock()
}
func (s *MutableStatus) TrackSendSuccess() {
s.mu.Lock()
s.LastSendSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
// TrackRecvResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackRecvResourceSuccess() {
s.mu.Lock()

View File

@ -10,6 +10,97 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
)
const (
aPeerID = "63b60245-c475-426b-b314-4588d210859d"
)
func TestStatus_IsHealthy(t *testing.T) {
type testcase struct {
name string
dontConnect bool
modifierFunc func(status *MutableStatus)
expectedVal bool
heartbeatTimeout time.Duration
}
tcs := []testcase{
{
name: "never connected, unhealthy",
expectedVal: false,
dontConnect: true,
},
{
name: "no heartbeat, unhealthy",
expectedVal: false,
},
{
name: "heartbeat is not received, unhealthy",
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now().Add(-1 * time.Second)
},
heartbeatTimeout: 1 * time.Second,
},
{
name: "send error before send success",
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
status.LastSendSuccess = time.Now()
status.LastSendError = time.Now()
},
},
{
name: "received error before received success",
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
status.LastRecvResourceSuccess = time.Now()
status.LastRecvError = time.Now()
},
},
{
name: "healthy",
expectedVal: true,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
tracker := NewTracker()
if tc.heartbeatTimeout.Microseconds() != 0 {
tracker.SetHeartbeatTimeout(tc.heartbeatTimeout)
}
if !tc.dontConnect {
st, err := tracker.Connected(aPeerID)
require.NoError(t, err)
require.True(t, st.Connected)
if tc.modifierFunc != nil {
tc.modifierFunc(st)
}
require.Equal(t, tc.expectedVal, st.IsHealthy())
} else {
st, found := tracker.StreamStatus(aPeerID)
require.False(t, found)
require.Equal(t, tc.expectedVal, st.IsHealthy())
}
})
}
}
func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
tracker := NewTracker()
peerID := "63b60245-c475-426b-b314-4588d210859d"
@ -30,6 +121,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{
Connected: true,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
}
status, ok := tracker.StreamStatus(peerID)
@ -57,6 +149,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{
Connected: true,
LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
}
require.Equal(t, expect, status)
})
@ -69,6 +162,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
Connected: false,
DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(),
LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
}
status, ok := tracker.StreamStatus(peerID)
require.True(t, ok)
@ -82,6 +176,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{
Connected: true,
LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
// DisconnectTime gets cleared on re-connect.
}
@ -96,7 +191,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
status, ok := tracker.StreamStatus(peerID)
require.False(t, ok)
require.Zero(t, status)
require.Equal(t, Status{NeverConnected: true}, status)
})
}