Adjust metrics reporting for peering tracker

This commit is contained in:
Chris S. Kim 2022-08-26 16:49:03 -04:00
parent b1ba0a89bc
commit 4d97e2f936
6 changed files with 68 additions and 122 deletions

View File

@ -742,7 +742,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
return s.ForwardGRPC(s.grpcConnPool, info, fn) return s.ForwardGRPC(s.grpcConnPool, info, fn)
}, },
}) })
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
s.peerStreamServer.Register(s.externalGRPCServer) s.peerStreamServer.Register(s.externalGRPCServer)
// Initialize internal gRPC server. // Initialize internal gRPC server.

View File

@ -42,8 +42,8 @@ type Config struct {
// outgoingHeartbeatInterval is how often we send a heartbeat. // outgoingHeartbeatInterval is how often we send a heartbeat.
outgoingHeartbeatInterval time.Duration outgoingHeartbeatInterval time.Duration
// IncomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection. // incomingHeartbeatTimeout is how long we'll wait between receiving heartbeats before we close the connection.
IncomingHeartbeatTimeout time.Duration incomingHeartbeatTimeout time.Duration
} }
//go:generate mockery --name ACLResolver --inpackage //go:generate mockery --name ACLResolver --inpackage
@ -63,8 +63,8 @@ func NewServer(cfg Config) *Server {
if cfg.outgoingHeartbeatInterval == 0 { if cfg.outgoingHeartbeatInterval == 0 {
cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval cfg.outgoingHeartbeatInterval = defaultOutgoingHeartbeatInterval
} }
if cfg.IncomingHeartbeatTimeout == 0 { if cfg.incomingHeartbeatTimeout == 0 {
cfg.IncomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout cfg.incomingHeartbeatTimeout = defaultIncomingHeartbeatTimeout
} }
return &Server{ return &Server{
Config: cfg, Config: cfg,

View File

@ -406,7 +406,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// incomingHeartbeatCtx will complete if incoming heartbeats time out. // incomingHeartbeatCtx will complete if incoming heartbeats time out.
incomingHeartbeatCtx, incomingHeartbeatCtxCancel := incomingHeartbeatCtx, incomingHeartbeatCtxCancel :=
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout) context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
// NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're // NOTE: It's important that we wrap the call to cancel in a wrapper func because during the loop we're
// re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned // re-assigning the value of incomingHeartbeatCtxCancel and we want the defer to run on the last assigned
// value, not the current value. // value, not the current value.
@ -575,6 +575,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
status.TrackRecvResourceSuccess() status.TrackRecvResourceSuccess()
} }
// We are replying ACK or NACK depending on whether we successfully processed the response.
if err := streamSend(reply); err != nil { if err := streamSend(reply); err != nil {
return fmt.Errorf("failed to send to stream: %v", err) return fmt.Errorf("failed to send to stream: %v", err)
} }
@ -605,7 +606,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// They just can't trace the execution properly for some reason (possibly golang/go#29587). // They just can't trace the execution properly for some reason (possibly golang/go#29587).
//nolint:govet //nolint:govet
incomingHeartbeatCtx, incomingHeartbeatCtxCancel = incomingHeartbeatCtx, incomingHeartbeatCtxCancel =
context.WithTimeout(context.Background(), s.IncomingHeartbeatTimeout) context.WithTimeout(context.Background(), s.incomingHeartbeatTimeout)
} }
case update := <-subCh: case update := <-subCh:
@ -642,7 +643,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
if err := streamSend(replResp); err != nil { if err := streamSend(replResp); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
} }
status.TrackSendSuccess()
} }
} }
} }

View File

@ -572,7 +572,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}) })
}) })
var lastSendAck, lastSendSuccess time.Time var lastSendAck time.Time
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) { testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
ack := &pbpeerstream.ReplicationMessage{ ack := &pbpeerstream.ReplicationMessage{
@ -587,16 +587,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}, },
} }
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 2, 0, time.UTC) lastSendAck = it.FutureNow(1)
lastSendSuccess = time.Date(2000, time.January, 1, 0, 0, 3, 0, time.UTC)
err := client.Send(ack) err := client.Send(ack)
require.NoError(t, err) require.NoError(t, err)
expect := Status{ expect := Status{
Connected: true, Connected: true,
LastAck: lastSendAck, LastAck: lastSendAck,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
} }
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -624,8 +621,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}, },
} }
lastSendAck = time.Date(2000, time.January, 1, 0, 0, 4, 0, time.UTC) lastNack = it.FutureNow(1)
lastNack = time.Date(2000, time.January, 1, 0, 0, 5, 0, time.UTC)
err := client.Send(nack) err := client.Send(nack)
require.NoError(t, err) require.NoError(t, err)
@ -636,8 +632,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastAck: lastSendAck, LastAck: lastSendAck,
LastNack: lastNack, LastNack: lastNack,
LastNackMessage: lastNackMsg, LastNackMessage: lastNackMsg,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
} }
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -661,7 +655,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}, },
}, },
} }
lastRecvResourceSuccess = it.FutureNow(1) lastRecvResourceSuccess = it.FutureNow(2)
err := client.Send(resp) err := client.Send(resp)
require.NoError(t, err) require.NoError(t, err)
@ -707,8 +701,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{ ImportedServices: map[string]struct{}{
api.String(): {}, api.String(): {},
}, },
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
} }
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -770,8 +762,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{ ImportedServices: map[string]struct{}{
api.String(): {}, api.String(): {},
}, },
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
} }
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -805,8 +795,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{ ImportedServices: map[string]struct{}{
api.String(): {}, api.String(): {},
}, },
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
} }
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -839,8 +827,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
ImportedServices: map[string]struct{}{ ImportedServices: map[string]struct{}{
api.String(): {}, api.String(): {},
}, },
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
LastSendSuccess: lastSendSuccess,
} }
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -1143,7 +1129,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) { srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now) c.Tracker.SetClock(it.Now)
c.IncomingHeartbeatTimeout = 5 * time.Millisecond c.incomingHeartbeatTimeout = 5 * time.Millisecond
}) })
p := writePeeringToBeDialed(t, store, 1, "my-peer") p := writePeeringToBeDialed(t, store, 1, "my-peer")
@ -1250,7 +1236,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) { srv, store := newTestServer(t, func(c *Config) {
c.Tracker.SetClock(it.Now) c.Tracker.SetClock(it.Now)
c.IncomingHeartbeatTimeout = incomingHeartbeatTimeout c.incomingHeartbeatTimeout = incomingHeartbeatTimeout
}) })
p := writePeeringToBeDialed(t, store, 1, "my-peer") p := writePeeringToBeDialed(t, store, 1, "my-peer")

View File

@ -16,8 +16,6 @@ type Tracker struct {
// timeNow is a shim for testing. // timeNow is a shim for testing.
timeNow func() time.Time timeNow func() time.Time
heartbeatTimeout time.Duration
} }
func NewTracker() *Tracker { func NewTracker() *Tracker {
@ -35,12 +33,6 @@ func (t *Tracker) SetClock(clock func() time.Time) {
} }
} }
func (t *Tracker) SetHeartbeatTimeout(heartbeatTimeout time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()
t.heartbeatTimeout = heartbeatTimeout
}
// Register a stream for a given peer but do not mark it as connected. // Register a stream for a given peer but do not mark it as connected.
func (t *Tracker) Register(id string) (*MutableStatus, error) { func (t *Tracker) Register(id string) (*MutableStatus, error) {
t.mu.Lock() t.mu.Lock()
@ -52,7 +44,7 @@ func (t *Tracker) Register(id string) (*MutableStatus, error) {
func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) { func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) {
status, ok := t.streams[id] status, ok := t.streams[id]
if !ok { if !ok {
status = newMutableStatus(t.timeNow, t.heartbeatTimeout, initAsConnected) status = newMutableStatus(t.timeNow, initAsConnected)
t.streams[id] = status t.streams[id] = status
return status, true, nil return status, true, nil
} }
@ -152,8 +144,6 @@ type MutableStatus struct {
// Status contains information about the replication stream to a peer cluster. // Status contains information about the replication stream to a peer cluster.
// TODO(peering): There's a lot of fields here... // TODO(peering): There's a lot of fields here...
type Status struct { type Status struct {
heartbeatTimeout time.Duration
// Connected is true when there is an open stream for the peer. // Connected is true when there is an open stream for the peer.
Connected bool Connected bool
@ -182,9 +172,6 @@ type Status struct {
// LastSendErrorMessage tracks the last error message when sending into the stream. // LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string LastSendErrorMessage string
// LastSendSuccess tracks the time of the last success response sent into the stream.
LastSendSuccess time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer. // LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat time.Time LastRecvHeartbeat time.Time
@ -216,37 +203,39 @@ func (s *Status) GetExportedServicesCount() uint64 {
// IsHealthy is a convenience func that returns true/ false for a peering status. // IsHealthy is a convenience func that returns true/ false for a peering status.
// We define a peering as unhealthy if its status satisfies one of the following: // We define a peering as unhealthy if its status satisfies one of the following:
// - If heartbeat hasn't been received within the IncomingHeartbeatTimeout // - If it is disconnected
// - If the last sent error is newer than last sent success // - If the last received Nack is newer than last received Ack
// - If the last received error is newer than last received success // - If the last received error is newer than last received success
// If none of these conditions apply, we call the peering healthy. // If none of these conditions apply, we call the peering healthy.
func (s *Status) IsHealthy() bool { func (s *Status) IsHealthy() bool {
if time.Now().Sub(s.LastRecvHeartbeat) > s.heartbeatTimeout { if !s.Connected {
// 1. If heartbeat hasn't been received for a while - report unhealthy
return false return false
} }
if s.LastSendError.After(s.LastSendSuccess) { // If stream is in a disconnected state, report unhealthy.
// 2. If last sent error is newer than last sent success - report unhealthy // This should be logically equivalent to s.Connected above.
if !s.DisconnectTime.IsZero() {
return false return false
} }
// If last Nack is after last Ack, it means the peer is unable to
// handle our replication messages.
if s.LastNack.After(s.LastAck) {
return false
}
// If last recv error is newer than last recv success - report unhealthy
if s.LastRecvError.After(s.LastRecvResourceSuccess) { if s.LastRecvError.After(s.LastRecvResourceSuccess) {
// 3. If last recv error is newer than last recv success - report unhealthy
return false return false
} }
return true return true
} }
func newMutableStatus(now func() time.Time, heartbeatTimeout time.Duration, connected bool) *MutableStatus { func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
if heartbeatTimeout.Microseconds() == 0 {
heartbeatTimeout = defaultIncomingHeartbeatTimeout
}
return &MutableStatus{ return &MutableStatus{
Status: Status{ Status: Status{
Connected: connected, Connected: connected,
heartbeatTimeout: heartbeatTimeout,
NeverConnected: !connected, NeverConnected: !connected,
}, },
timeNow: now, timeNow: now,
@ -271,12 +260,6 @@ func (s *MutableStatus) TrackSendError(error string) {
s.mu.Unlock() s.mu.Unlock()
} }
func (s *MutableStatus) TrackSendSuccess() {
s.mu.Lock()
s.LastSendSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
// TrackRecvResourceSuccess tracks receiving a replicated resource. // TrackRecvResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackRecvResourceSuccess() { func (s *MutableStatus) TrackRecvResourceSuccess() {
s.mu.Lock() s.mu.Lock()

View File

@ -20,7 +20,6 @@ func TestStatus_IsHealthy(t *testing.T) {
dontConnect bool dontConnect bool
modifierFunc func(status *MutableStatus) modifierFunc func(status *MutableStatus)
expectedVal bool expectedVal bool
heartbeatTimeout time.Duration
} }
tcs := []testcase{ tcs := []testcase{
@ -30,56 +29,39 @@ func TestStatus_IsHealthy(t *testing.T) {
dontConnect: true, dontConnect: true,
}, },
{ {
name: "no heartbeat, unhealthy", name: "disconnect time not zero",
expectedVal: false,
},
{
name: "heartbeat is not received, unhealthy",
expectedVal: false, expectedVal: false,
modifierFunc: func(status *MutableStatus) { modifierFunc: func(status *MutableStatus) {
// set heartbeat status.DisconnectTime = time.Now()
status.LastRecvHeartbeat = time.Now().Add(-1 * time.Second)
},
heartbeatTimeout: 1 * time.Second,
},
{
name: "send error before send success",
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
status.LastSendSuccess = time.Now()
status.LastSendError = time.Now()
}, },
}, },
{ {
name: "received error before received success", name: "receive error before receive success",
expectedVal: false, expectedVal: false,
modifierFunc: func(status *MutableStatus) { modifierFunc: func(status *MutableStatus) {
// set heartbeat now := time.Now()
status.LastRecvHeartbeat = time.Now() status.LastRecvResourceSuccess = now
status.LastRecvError = now.Add(1 * time.Second)
status.LastRecvResourceSuccess = time.Now() },
status.LastRecvError = time.Now() },
{
name: "receive error before receive success",
expectedVal: false,
modifierFunc: func(status *MutableStatus) {
now := time.Now()
status.LastAck = now
status.LastNack = now.Add(1 * time.Second)
}, },
}, },
{ {
name: "healthy", name: "healthy",
expectedVal: true, expectedVal: true,
modifierFunc: func(status *MutableStatus) {
// set heartbeat
status.LastRecvHeartbeat = time.Now()
},
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
tracker := NewTracker() tracker := NewTracker()
if tc.heartbeatTimeout.Microseconds() != 0 {
tracker.SetHeartbeatTimeout(tc.heartbeatTimeout)
}
if !tc.dontConnect { if !tc.dontConnect {
st, err := tracker.Connected(aPeerID) st, err := tracker.Connected(aPeerID)
@ -121,7 +103,6 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{ expect := Status{
Connected: true, Connected: true,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
} }
status, ok := tracker.StreamStatus(peerID) status, ok := tracker.StreamStatus(peerID)
@ -149,7 +130,6 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{ expect := Status{
Connected: true, Connected: true,
LastAck: lastSuccess, LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
} }
require.Equal(t, expect, status) require.Equal(t, expect, status)
}) })
@ -162,7 +142,6 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
Connected: false, Connected: false,
DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(), DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(),
LastAck: lastSuccess, LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
} }
status, ok := tracker.StreamStatus(peerID) status, ok := tracker.StreamStatus(peerID)
require.True(t, ok) require.True(t, ok)
@ -176,7 +155,6 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
expect := Status{ expect := Status{
Connected: true, Connected: true,
LastAck: lastSuccess, LastAck: lastSuccess,
heartbeatTimeout: defaultIncomingHeartbeatTimeout,
// DisconnectTime gets cleared on re-connect. // DisconnectTime gets cleared on re-connect.
} }