diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index d1823b026b..00128bcd87 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -31,11 +31,18 @@ import ( ) var leaderExportedServicesCountKey = []string{"consul", "peering", "exported_services"} +var leaderHealthyPeeringKey = []string{"consul", "peering", "healthy"} var LeaderPeeringMetrics = []prometheus.GaugeDefinition{ { Name: leaderExportedServicesCountKey, Help: "A gauge that tracks how many services are exported for the peering. " + - "The labels are \"peering\" and, for enterprise, \"partition\". " + + "The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " + + "We emit this metric every 9 seconds", + }, + { + Name: leaderHealthyPeeringKey, + Help: "A gauge that tracks how if a peering is healthy (1) or not (0). " + + "The labels are \"peer_name\", \"peer_id\" and, for enterprise, \"partition\". " + "We emit this metric every 9 seconds", }, } @@ -85,13 +92,6 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric } for _, peer := range peers { - status, found := s.peerStreamServer.StreamStatus(peer.ID) - if !found { - logger.Trace("did not find status for", "peer_name", peer.Name) - continue - } - - esc := status.GetExportedServicesCount() part := peer.Partition labels := []metrics.Label{ {Name: "peer_name", Value: peer.Name}, @@ -101,7 +101,25 @@ func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metric labels = append(labels, metrics.Label{Name: "partition", Value: part}) } - metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels) + status, found := s.peerStreamServer.StreamStatus(peer.ID) + if found { + // exported services count metric + esc := status.GetExportedServicesCount() + metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels) + } + + // peering health metric + if status.NeverConnected { + metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(math.NaN()), labels) + } else { + healthy := status.IsHealthy() + healthyInt := 0 + if healthy { + healthyInt = 1 + } + + metricsImpl.SetGaugeWithLabels(leaderHealthyPeeringKey, float32(healthyInt), labels) + } } return nil diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 46a74b6ad3..d419303852 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io/ioutil" + "math" "testing" "time" @@ -974,6 +975,7 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) { var ( s2PeerID1 = generateUUID() s2PeerID2 = generateUUID() + s2PeerID3 = generateUUID() testContextTimeout = 60 * time.Second lastIdx = uint64(0) ) @@ -1063,6 +1065,24 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) { // mimic tracking exported services mst2.TrackExportedService(structs.ServiceName{Name: "d-service"}) mst2.TrackExportedService(structs.ServiceName{Name: "e-service"}) + + // pretend that the hearbeat happened + mst2.TrackRecvHeartbeat() + } + + // Simulate a peering that never connects + { + p3 := &pbpeering.Peering{ + ID: s2PeerID3, + Name: "my-peer-s4", + PeerID: token.PeerID, // doesn't much matter what these values are + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + require.True(t, p3.ShouldDial()) + lastIdx++ + require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p3})) } // set up a metrics sink @@ -1092,6 +1112,18 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) { require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric2)) require.Equal(r, float32(2), metric2.Value) // for d, e services + + keyHealthyMetric2 := fmt.Sprintf("us-west.consul.peering.healthy;peer_name=my-peer-s3;peer_id=%s", s2PeerID2) + healthyMetric2, ok := intv.Gauges[keyHealthyMetric2] + require.True(r, ok, fmt.Sprintf("did not find the key %q", keyHealthyMetric2)) + + require.Equal(r, float32(1), healthyMetric2.Value) + + keyHealthyMetric3 := fmt.Sprintf("us-west.consul.peering.healthy;peer_name=my-peer-s4;peer_id=%s", s2PeerID3) + healthyMetric3, ok := intv.Gauges[keyHealthyMetric3] + require.True(r, ok, fmt.Sprintf("did not find the key %q", keyHealthyMetric3)) + + require.True(r, math.IsNaN(float64(healthyMetric3.Value))) }) } diff --git a/agent/consul/server.go b/agent/consul/server.go index 1afa74c91d..8f2986c3eb 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -742,6 +742,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser return s.ForwardGRPC(s.grpcConnPool, info, fn) }, }) + s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout) s.peerStreamServer.Register(s.externalGRPCServer) // Initialize internal gRPC server. diff --git a/agent/grpc-external/services/peerstream/server.go b/agent/grpc-external/services/peerstream/server.go index 7254c60c7c..6568d7bf80 100644 --- a/agent/grpc-external/services/peerstream/server.go +++ b/agent/grpc-external/services/peerstream/server.go @@ -42,8 +42,8 @@ type Config struct { // outgoingHeartbeatInterval is how often we send a heartbeat. outgoingHeartbeatInterval time.Duration - // incomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection. - incomingHeartbeatTimeout time.Duration + // IncomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection. + IncomingHeartbeatTimeout time.Duration } //go:generate mockery --name ACLResolver --inpackage @@ -63,8 +63,8 @@ func NewServer(cfg Config) *Server { if cfg.outgoingHeartbeatInterval == 0 { cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval } - if cfg.incomingHeartbeatTimeout == 0 { - cfg.incomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout + if cfg.IncomingHeartbeatTimeout == 0 { + cfg.IncomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout } return &Server{ Config: cfg, diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 657972b886..0e6b28f45a 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -406,7 +406,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // incomingHeartbeatCtx will complete if incoming heartbeats time out. incomingHeartbeatCtx, incomingHeartbeatCtxCancel := - context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout) + context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout) // NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're // re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned // value, not the current value. @@ -605,7 +605,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // They just can't trace the execution properly for some reason (possibly golang/go#29587). //nolint:govet incomingHeartbeatCtx, incomingHeartbeatCtxCancel = - context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout) + context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout) } case update := <-subCh: @@ -642,6 +642,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { if err := streamSend(replResp); err != nil { return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } + status.TrackSendSuccess() } } } diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 49ba7be046..be4a44ec87 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -572,7 +572,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) }) - var lastSendSuccess time.Time + var lastSendAck, lastSendSuccess time.Time testutil.RunStep(t, "ack tracked as success", func(t *testing.T) { ack := &pbpeerstream.ReplicationMessage{ @@ -587,19 +587,22 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, } - lastSendSuccess = it.FutureNow(1) + lastSendAck = time.Date(2000, time.January, 1, 0, 0, 2, 0, time.UTC) + lastSendSuccess = time.Date(2000, time.January, 1, 0, 0, 3, 0, time.UTC) err := client.Send(ack) require.NoError(t, err) expect := Status{ - Connected: true, - LastAck: lastSendSuccess, + Connected: true, + LastAck: lastSendAck, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, + LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { - status, ok := srv.StreamStatus(testPeerID) + rStatus, ok := srv.StreamStatus(testPeerID) require.True(r, ok) - require.Equal(r, expect, status) + require.Equal(r, expect, rStatus) }) }) @@ -621,23 +624,26 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, } - lastNack = it.FutureNow(1) + lastSendAck = time.Date(2000, time.January, 1, 0, 0, 4, 0, time.UTC) + lastNack = time.Date(2000, time.January, 1, 0, 0, 5, 0, time.UTC) err := client.Send(nack) require.NoError(t, err) lastNackMsg = "client peer was unable to apply resource: bad bad not good" expect := Status{ - Connected: true, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, + Connected: true, + LastAck: lastSendAck, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, + LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { - status, ok := srv.StreamStatus(testPeerID) + rStatus, ok := srv.StreamStatus(testPeerID) require.True(r, ok) - require.Equal(r, expect, status) + require.Equal(r, expect, rStatus) }) }) @@ -694,13 +700,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, - LastAck: lastSendSuccess, + LastAck: lastSendAck, LastNack: lastNack, LastNackMessage: lastNackMsg, LastRecvResourceSuccess: lastRecvResourceSuccess, ImportedServices: map[string]struct{}{ api.String(): {}, }, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, + LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -753,7 +761,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, - LastAck: lastSendSuccess, + LastAck: lastSendAck, LastNack: lastNack, LastNackMessage: lastNackMsg, LastRecvResourceSuccess: lastRecvResourceSuccess, @@ -762,6 +770,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, + LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -785,7 +795,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, - LastAck: lastSendSuccess, + LastAck: lastSendAck, LastNack: lastNack, LastNackMessage: lastNackMsg, LastRecvResourceSuccess: lastRecvResourceSuccess, @@ -795,6 +805,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, + LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -816,7 +828,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: false, DisconnectErrorMessage: lastRecvErrorMsg, - LastAck: lastSendSuccess, + LastAck: lastSendAck, LastNack: lastNack, LastNackMessage: lastNackMsg, DisconnectTime: disconnectTime, @@ -827,6 +839,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, + LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -1129,7 +1143,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { srv, store := newTestServer(t, func(c *Config) { c.Tracker.SetClock(it.Now) - c.incomingHeartbeatTimeout = 5 * time.Millisecond + c.IncomingHeartbeatTimeout = 5 * time.Millisecond }) p := writePeeringToBeDialed(t, store, 1, "my-peer") @@ -1236,7 +1250,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) { srv, store := newTestServer(t, func(c *Config) { c.Tracker.SetClock(it.Now) - c.incomingHeartbeatTimeout = incomingHeartbeatTimeout + c.IncomingHeartbeatTimeout = incomingHeartbeatTimeout }) p := writePeeringToBeDialed(t, store, 1, "my-peer") diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index f7a451595d..ffde98ba32 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -16,6 +16,8 @@ type Tracker struct { // timeNow is a shim for testing. timeNow func() time.Time + + heartbeatTimeout time.Duration } func NewTracker() *Tracker { @@ -33,6 +35,12 @@ func (t *Tracker) SetClock(clock func() time.Time) { } } +func (t *Tracker) SetHeartbeatTimeout(heartbeatTimeout time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + t.heartbeatTimeout = heartbeatTimeout +} + // Register a stream for a given peer but do not mark it as connected. func (t *Tracker) Register(id string) (*MutableStatus, error) { t.mu.Lock() @@ -44,7 +52,7 @@ func (t *Tracker) Register(id string) (*MutableStatus, error) { func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) { status, ok := t.streams[id] if !ok { - status = newMutableStatus(t.timeNow, initAsConnected) + status = newMutableStatus(t.timeNow, t.heartbeatTimeout, initAsConnected) t.streams[id] = status return status, true, nil } @@ -101,7 +109,9 @@ func (t *Tracker) StreamStatus(id string) (resp Status, found bool) { s, ok := t.streams[id] if !ok { - return Status{}, false + return Status{ + NeverConnected: true, + }, false } return s.GetStatus(), true } @@ -142,9 +152,14 @@ type MutableStatus struct { // Status contains information about the replication stream to a peer cluster. // TODO(peering): There's a lot of fields here... type Status struct { + heartbeatTimeout time.Duration + // Connected is true when there is an open stream for the peer. Connected bool + // NeverConnected is true for peerings that have never connected, false otherwise. + NeverConnected bool + // DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully. // If the stream is connected or it disconnected gracefully it will be empty. DisconnectErrorMessage string @@ -167,6 +182,9 @@ type Status struct { // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string + // LastSendSuccess tracks the time of the last success response sent into the stream. + LastSendSuccess time.Time + // LastRecvHeartbeat tracks when we last received a heartbeat from our peer. LastRecvHeartbeat time.Time @@ -196,10 +214,40 @@ func (s *Status) GetExportedServicesCount() uint64 { return uint64(len(s.ExportedServices)) } -func newMutableStatus(now func() time.Time, connected bool) *MutableStatus { +// IsHealthy is a convenience func that returns true/ false for a peering status. +// We define a peering as unhealthy if its status satisfies one of the following: +// - If heartbeat hasn't been received within the IncomingHeartbeatTimeout +// - If the last sent error is newer than last sent success +// - If the last received error is newer than last received success +// If none of these conditions apply, we call the peering healthy. +func (s *Status) IsHealthy() bool { + if time.Now().Sub(s.LastRecvHeartbeat) > s.heartbeatTimeout { + // 1. If heartbeat hasn't been received for a while - report unhealthy + return false + } + + if s.LastSendError.After(s.LastSendSuccess) { + // 2. If last sent error is newer than last sent success - report unhealthy + return false + } + + if s.LastRecvError.After(s.LastRecvResourceSuccess) { + // 3. If last recv error is newer than last recv success - report unhealthy + return false + } + + return true +} + +func newMutableStatus(now func() time.Time, heartbeatTimeout time.Duration, connected bool) *MutableStatus { + if heartbeatTimeout.Microseconds() == 0 { + heartbeatTimeout = defaultIncomingHeartbeatTimeout + } return &MutableStatus{ Status: Status{ - Connected: connected, + Connected: connected, + heartbeatTimeout: heartbeatTimeout, + NeverConnected: !connected, }, timeNow: now, doneCh: make(chan struct{}), @@ -223,6 +271,12 @@ func (s *MutableStatus) TrackSendError(error string) { s.mu.Unlock() } +func (s *MutableStatus) TrackSendSuccess() { + s.mu.Lock() + s.LastSendSuccess = s.timeNow().UTC() + s.mu.Unlock() +} + // TrackRecvResourceSuccess tracks receiving a replicated resource. func (s *MutableStatus) TrackRecvResourceSuccess() { s.mu.Lock() diff --git a/agent/grpc-external/services/peerstream/stream_tracker_test.go b/agent/grpc-external/services/peerstream/stream_tracker_test.go index f7a9df321d..8cdcbc79a2 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker_test.go +++ b/agent/grpc-external/services/peerstream/stream_tracker_test.go @@ -10,6 +10,97 @@ import ( "github.com/hashicorp/consul/sdk/testutil" ) +const ( + aPeerID = "63b60245-c475-426b-b314-4588d210859d" +) + +func TestStatus_IsHealthy(t *testing.T) { + type testcase struct { + name string + dontConnect bool + modifierFunc func(status *MutableStatus) + expectedVal bool + heartbeatTimeout time.Duration + } + + tcs := []testcase{ + { + name: "never connected, unhealthy", + expectedVal: false, + dontConnect: true, + }, + { + name: "no heartbeat, unhealthy", + expectedVal: false, + }, + { + name: "heartbeat is not received, unhealthy", + expectedVal: false, + modifierFunc: func(status *MutableStatus) { + // set heartbeat + status.LastRecvHeartbeat = time.Now().Add(-1 * time.Second) + }, + heartbeatTimeout: 1 * time.Second, + }, + { + name: "send error before send success", + expectedVal: false, + modifierFunc: func(status *MutableStatus) { + // set heartbeat + status.LastRecvHeartbeat = time.Now() + + status.LastSendSuccess = time.Now() + status.LastSendError = time.Now() + }, + }, + { + name: "received error before received success", + expectedVal: false, + modifierFunc: func(status *MutableStatus) { + // set heartbeat + status.LastRecvHeartbeat = time.Now() + + status.LastRecvResourceSuccess = time.Now() + status.LastRecvError = time.Now() + }, + }, + { + name: "healthy", + expectedVal: true, + modifierFunc: func(status *MutableStatus) { + // set heartbeat + status.LastRecvHeartbeat = time.Now() + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tracker := NewTracker() + if tc.heartbeatTimeout.Microseconds() != 0 { + tracker.SetHeartbeatTimeout(tc.heartbeatTimeout) + } + + if !tc.dontConnect { + st, err := tracker.Connected(aPeerID) + require.NoError(t, err) + require.True(t, st.Connected) + + if tc.modifierFunc != nil { + tc.modifierFunc(st) + } + + require.Equal(t, tc.expectedVal, st.IsHealthy()) + + } else { + st, found := tracker.StreamStatus(aPeerID) + require.False(t, found) + require.Equal(t, tc.expectedVal, st.IsHealthy()) + } + }) + } +} + func TestTracker_EnsureConnectedDisconnected(t *testing.T) { tracker := NewTracker() peerID := "63b60245-c475-426b-b314-4588d210859d" @@ -29,7 +120,8 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { require.NoError(t, err) expect := Status{ - Connected: true, + Connected: true, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, } status, ok := tracker.StreamStatus(peerID) @@ -55,8 +147,9 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { lastSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() expect := Status{ - Connected: true, - LastAck: lastSuccess, + Connected: true, + LastAck: lastSuccess, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, } require.Equal(t, expect, status) }) @@ -66,9 +159,10 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { sequence++ expect := Status{ - Connected: false, - DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(), - LastAck: lastSuccess, + Connected: false, + DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(), + LastAck: lastSuccess, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, } status, ok := tracker.StreamStatus(peerID) require.True(t, ok) @@ -80,8 +174,9 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { require.NoError(t, err) expect := Status{ - Connected: true, - LastAck: lastSuccess, + Connected: true, + LastAck: lastSuccess, + heartbeatTimeout: defaultIncomingHeartbeatTimeout, // DisconnectTime gets cleared on re-connect. } @@ -96,7 +191,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { status, ok := tracker.StreamStatus(peerID) require.False(t, ok) - require.Zero(t, status) + require.Equal(t, Status{NeverConnected: true}, status) }) }