From 4d97e2f9364bfdae39eda1297df3f47fbd7c2aca Mon Sep 17 00:00:00 2001 From: "Chris S. Kim" Date: Fri, 26 Aug 2022 16:49:03 -0400 Subject: [PATCH] Adjust metrics reporting for peering tracker --- agent/consul/server.go | 11 ++- .../services/peerstream/server.go | 8 +- .../services/peerstream/stream_resources.go | 6 +- .../services/peerstream/stream_test.go | 38 +++------- .../services/peerstream/stream_tracker.go | 51 +++++-------- .../peerstream/stream_tracker_test.go | 76 +++++++------------ 6 files changed, 68 insertions(+), 122 deletions(-) diff --git a/agent/consul/server.go b/agent/consul/server.go index 8f2986c3eb..f92a03ccd2 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -742,7 +742,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser return s.ForwardGRPC(s.grpcConnPool, info, fn) }, }) - s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout) s.peerStreamServer.Register(s.externalGRPCServer) // Initialize internal gRPC server. @@ -1575,12 +1574,12 @@ func (s *Server) Stats() map[string]map[string]string { // GetLANCoordinate returns the coordinate of the node in the LAN gossip // pool. // -// - Clients return a single coordinate for the single gossip pool they are -// in (default, segment, or partition). +// - Clients return a single coordinate for the single gossip pool they are +// in (default, segment, or partition). // -// - Servers return one coordinate for their canonical gossip pool (i.e. -// default partition/segment) and one per segment they are also ancillary -// members of. +// - Servers return one coordinate for their canonical gossip pool (i.e. +// default partition/segment) and one per segment they are also ancillary +// members of. // // NOTE: servers do not emit coordinates for partitioned gossip pools they // are ancillary members of. diff --git a/agent/grpc-external/services/peerstream/server.go b/agent/grpc-external/services/peerstream/server.go index 6568d7bf80..7254c60c7c 100644 --- a/agent/grpc-external/services/peerstream/server.go +++ b/agent/grpc-external/services/peerstream/server.go @@ -42,8 +42,8 @@ type Config struct { // outgoingHeartbeatInterval is how often we send a heartbeat. outgoingHeartbeatInterval time.Duration - // IncomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection. - IncomingHeartbeatTimeout time.Duration + // incomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection. + incomingHeartbeatTimeout time.Duration } //go:generate mockery --name ACLResolver --inpackage @@ -63,8 +63,8 @@ func NewServer(cfg Config) *Server { if cfg.outgoingHeartbeatInterval == 0 { cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval } - if cfg.IncomingHeartbeatTimeout == 0 { - cfg.IncomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout + if cfg.incomingHeartbeatTimeout == 0 { + cfg.incomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout } return &Server{ Config: cfg, diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 0e6b28f45a..ad5d9d4631 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -406,7 +406,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // incomingHeartbeatCtx will complete if incoming heartbeats time out. incomingHeartbeatCtx, incomingHeartbeatCtxCancel := - context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout) + context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout) // NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're // re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned // value, not the current value. @@ -575,6 +575,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { status.TrackRecvResourceSuccess() } + // We are replying ACK or NACK depending on whether we successfully processed the response. if err := streamSend(reply); err != nil { return fmt.Errorf("failed to send to stream: %v", err) } @@ -605,7 +606,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // They just can't trace the execution properly for some reason (possibly golang/go#29587). //nolint:govet incomingHeartbeatCtx, incomingHeartbeatCtxCancel = - context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout) + context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout) } case update := <-subCh: @@ -642,7 +643,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { if err := streamSend(replResp); err != nil { return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } - status.TrackSendSuccess() } } } diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index be4a44ec87..fcdd07422b 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -572,7 +572,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) }) - var lastSendAck, lastSendSuccess time.Time + var lastSendAck time.Time testutil.RunStep(t, "ack tracked as success", func(t *testing.T) { ack := &pbpeerstream.ReplicationMessage{ @@ -587,16 +587,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, } - lastSendAck = time.Date(2000, time.January, 1, 0, 0, 2, 0, time.UTC) - lastSendSuccess = time.Date(2000, time.January, 1, 0, 0, 3, 0, time.UTC) + lastSendAck = it.FutureNow(1) err := client.Send(ack) require.NoError(t, err) expect := Status{ - Connected: true, - LastAck: lastSendAck, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, - LastSendSuccess: lastSendSuccess, + Connected: true, + LastAck: lastSendAck, } retry.Run(t, func(r *retry.R) { @@ -624,20 +621,17 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, } - lastSendAck = time.Date(2000, time.January, 1, 0, 0, 4, 0, time.UTC) - lastNack = time.Date(2000, time.January, 1, 0, 0, 5, 0, time.UTC) + lastNack = it.FutureNow(1) err := client.Send(nack) require.NoError(t, err) lastNackMsg = "client peer was unable to apply resource: bad bad not good" expect := Status{ - Connected: true, - LastAck: lastSendAck, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, - LastSendSuccess: lastSendSuccess, + Connected: true, + LastAck: lastSendAck, + LastNack: lastNack, + LastNackMessage: lastNackMsg, } retry.Run(t, func(r *retry.R) { @@ -661,7 +655,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, }, } - lastRecvResourceSuccess = it.FutureNow(1) + lastRecvResourceSuccess = it.FutureNow(2) err := client.Send(resp) require.NoError(t, err) @@ -707,8 +701,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, - LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -770,8 +762,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, - LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -805,8 +795,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, - LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -839,8 +827,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ImportedServices: map[string]struct{}{ api.String(): {}, }, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, - LastSendSuccess: lastSendSuccess, } retry.Run(t, func(r *retry.R) { @@ -1143,7 +1129,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { srv, store := newTestServer(t, func(c *Config) { c.Tracker.SetClock(it.Now) - c.IncomingHeartbeatTimeout = 5 * time.Millisecond + c.incomingHeartbeatTimeout = 5 * time.Millisecond }) p := writePeeringToBeDialed(t, store, 1, "my-peer") @@ -1250,7 +1236,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) { srv, store := newTestServer(t, func(c *Config) { c.Tracker.SetClock(it.Now) - c.IncomingHeartbeatTimeout = incomingHeartbeatTimeout + c.incomingHeartbeatTimeout = incomingHeartbeatTimeout }) p := writePeeringToBeDialed(t, store, 1, "my-peer") diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index ffde98ba32..d0dcd39770 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -16,8 +16,6 @@ type Tracker struct { // timeNow is a shim for testing. timeNow func() time.Time - - heartbeatTimeout time.Duration } func NewTracker() *Tracker { @@ -35,12 +33,6 @@ func (t *Tracker) SetClock(clock func() time.Time) { } } -func (t *Tracker) SetHeartbeatTimeout(heartbeatTimeout time.Duration) { - t.mu.Lock() - defer t.mu.Unlock() - t.heartbeatTimeout = heartbeatTimeout -} - // Register a stream for a given peer but do not mark it as connected. func (t *Tracker) Register(id string) (*MutableStatus, error) { t.mu.Lock() @@ -52,7 +44,7 @@ func (t *Tracker) Register(id string) (*MutableStatus, error) { func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) { status, ok := t.streams[id] if !ok { - status = newMutableStatus(t.timeNow, t.heartbeatTimeout, initAsConnected) + status = newMutableStatus(t.timeNow, initAsConnected) t.streams[id] = status return status, true, nil } @@ -152,8 +144,6 @@ type MutableStatus struct { // Status contains information about the replication stream to a peer cluster. // TODO(peering): There's a lot of fields here... type Status struct { - heartbeatTimeout time.Duration - // Connected is true when there is an open stream for the peer. Connected bool @@ -182,9 +172,6 @@ type Status struct { // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string - // LastSendSuccess tracks the time of the last success response sent into the stream. - LastSendSuccess time.Time - // LastRecvHeartbeat tracks when we last received a heartbeat from our peer. LastRecvHeartbeat time.Time @@ -216,38 +203,40 @@ func (s *Status) GetExportedServicesCount() uint64 { // IsHealthy is a convenience func that returns true/ false for a peering status. // We define a peering as unhealthy if its status satisfies one of the following: -// - If heartbeat hasn't been received within the IncomingHeartbeatTimeout -// - If the last sent error is newer than last sent success +// - If it is disconnected +// - If the last received Nack is newer than last received Ack // - If the last received error is newer than last received success // If none of these conditions apply, we call the peering healthy. func (s *Status) IsHealthy() bool { - if time.Now().Sub(s.LastRecvHeartbeat) > s.heartbeatTimeout { - // 1. If heartbeat hasn't been received for a while - report unhealthy + if !s.Connected { return false } - if s.LastSendError.After(s.LastSendSuccess) { - // 2. If last sent error is newer than last sent success - report unhealthy + // If stream is in a disconnected state, report unhealthy. + // This should be logically equivalent to s.Connected above. + if !s.DisconnectTime.IsZero() { return false } + // If last Nack is after last Ack, it means the peer is unable to + // handle our replication messages. + if s.LastNack.After(s.LastAck) { + return false + } + + // If last recv error is newer than last recv success - report unhealthy if s.LastRecvError.After(s.LastRecvResourceSuccess) { - // 3. If last recv error is newer than last recv success - report unhealthy return false } return true } -func newMutableStatus(now func() time.Time, heartbeatTimeout time.Duration, connected bool) *MutableStatus { - if heartbeatTimeout.Microseconds() == 0 { - heartbeatTimeout = defaultIncomingHeartbeatTimeout - } +func newMutableStatus(now func() time.Time, connected bool) *MutableStatus { return &MutableStatus{ Status: Status{ - Connected: connected, - heartbeatTimeout: heartbeatTimeout, - NeverConnected: !connected, + Connected: connected, + NeverConnected: !connected, }, timeNow: now, doneCh: make(chan struct{}), @@ -271,12 +260,6 @@ func (s *MutableStatus) TrackSendError(error string) { s.mu.Unlock() } -func (s *MutableStatus) TrackSendSuccess() { - s.mu.Lock() - s.LastSendSuccess = s.timeNow().UTC() - s.mu.Unlock() -} - // TrackRecvResourceSuccess tracks receiving a replicated resource. func (s *MutableStatus) TrackRecvResourceSuccess() { s.mu.Lock() diff --git a/agent/grpc-external/services/peerstream/stream_tracker_test.go b/agent/grpc-external/services/peerstream/stream_tracker_test.go index 8cdcbc79a2..7500ccd4b8 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker_test.go +++ b/agent/grpc-external/services/peerstream/stream_tracker_test.go @@ -16,11 +16,10 @@ const ( func TestStatus_IsHealthy(t *testing.T) { type testcase struct { - name string - dontConnect bool - modifierFunc func(status *MutableStatus) - expectedVal bool - heartbeatTimeout time.Duration + name string + dontConnect bool + modifierFunc func(status *MutableStatus) + expectedVal bool } tcs := []testcase{ @@ -30,56 +29,39 @@ func TestStatus_IsHealthy(t *testing.T) { dontConnect: true, }, { - name: "no heartbeat, unhealthy", - expectedVal: false, - }, - { - name: "heartbeat is not received, unhealthy", + name: "disconnect time not zero", expectedVal: false, modifierFunc: func(status *MutableStatus) { - // set heartbeat - status.LastRecvHeartbeat = time.Now().Add(-1 * time.Second) - }, - heartbeatTimeout: 1 * time.Second, - }, - { - name: "send error before send success", - expectedVal: false, - modifierFunc: func(status *MutableStatus) { - // set heartbeat - status.LastRecvHeartbeat = time.Now() - - status.LastSendSuccess = time.Now() - status.LastSendError = time.Now() + status.DisconnectTime = time.Now() }, }, { - name: "received error before received success", + name: "receive error before receive success", expectedVal: false, modifierFunc: func(status *MutableStatus) { - // set heartbeat - status.LastRecvHeartbeat = time.Now() - - status.LastRecvResourceSuccess = time.Now() - status.LastRecvError = time.Now() + now := time.Now() + status.LastRecvResourceSuccess = now + status.LastRecvError = now.Add(1 * time.Second) + }, + }, + { + name: "receive error before receive success", + expectedVal: false, + modifierFunc: func(status *MutableStatus) { + now := time.Now() + status.LastAck = now + status.LastNack = now.Add(1 * time.Second) }, }, { name: "healthy", expectedVal: true, - modifierFunc: func(status *MutableStatus) { - // set heartbeat - status.LastRecvHeartbeat = time.Now() - }, }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { tracker := NewTracker() - if tc.heartbeatTimeout.Microseconds() != 0 { - tracker.SetHeartbeatTimeout(tc.heartbeatTimeout) - } if !tc.dontConnect { st, err := tracker.Connected(aPeerID) @@ -120,8 +102,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { require.NoError(t, err) expect := Status{ - Connected: true, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, + Connected: true, } status, ok := tracker.StreamStatus(peerID) @@ -147,9 +128,8 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { lastSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() expect := Status{ - Connected: true, - LastAck: lastSuccess, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, + Connected: true, + LastAck: lastSuccess, } require.Equal(t, expect, status) }) @@ -159,10 +139,9 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { sequence++ expect := Status{ - Connected: false, - DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(), - LastAck: lastSuccess, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, + Connected: false, + DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(), + LastAck: lastSuccess, } status, ok := tracker.StreamStatus(peerID) require.True(t, ok) @@ -174,9 +153,8 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { require.NoError(t, err) expect := Status{ - Connected: true, - LastAck: lastSuccess, - heartbeatTimeout: defaultIncomingHeartbeatTimeout, + Connected: true, + LastAck: lastSuccess, // DisconnectTime gets cleared on re-connect. }