diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 5702f0e135..000e26ca9d 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -160,9 +160,19 @@ func (s *Server) DrainStream(req HandleStreamRequest) { } } +func (s *Server) HandleStream(streamReq HandleStreamRequest) error { + if err := s.realHandleStream(streamReq); err != nil { + s.Tracker.DisconnectedDueToError(streamReq.LocalID, err.Error()) + return err + } + // TODO(peering) Also need to clear subscriptions associated with the peer + s.Tracker.DisconnectedGracefully(streamReq.LocalID) + return nil +} + // The localID provided is the locally-generated identifier for the peering. // The remoteID is an identifier that the remote peer recognizes for the peering. -func (s *Server) HandleStream(streamReq HandleStreamRequest) error { +func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // TODO: pass logger down from caller? logger := s.Logger.Named("stream"). With("peer_name", streamReq.PeerName). @@ -175,9 +185,6 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { return fmt.Errorf("failed to register stream: %v", err) } - // TODO(peering) Also need to clear subscriptions associated with the peer - defer s.Tracker.Disconnected(streamReq.LocalID) - var trustDomain string if s.ConnectEnabled { // Read the TrustDomain up front - we do not allow users to change the ClusterID @@ -454,7 +461,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) status.TrackReceiveError(err.Error()) } else { - status.TrackReceiveSuccess() + status.TrackReceiveResourceSuccess() } if err := streamSend(reply); err != nil { @@ -475,6 +482,8 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { } if msg.GetHeartbeat() != nil { + status.TrackReceiveHeartbeat() + // Reset the heartbeat timeout by creating a new context. // We first must cancel the old context so there's no leaks. This is safe to do because we're only // reading that context within this for{} loop, and so we won't accidentally trigger the heartbeat diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 41c24dc1b4..d9e18a1e62 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -423,7 +423,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) }) - var lastRecvSuccess time.Time + var lastRecvResourceSuccess time.Time testutil.RunStep(t, "response applied locally", func(t *testing.T) { resp := &pbpeerstream.ReplicationMessage{ @@ -437,7 +437,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, }, } - lastRecvSuccess = it.FutureNow(1) + lastRecvResourceSuccess = it.FutureNow(1) err := client.Send(resp) require.NoError(t, err) @@ -475,11 +475,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { api := structs.NewServiceName("api", nil) expect := Status{ - Connected: true, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - LastReceiveSuccess: lastRecvSuccess, + Connected: true, + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + LastReceiveResourceSuccess: lastRecvResourceSuccess, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -534,13 +534,46 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { api := structs.NewServiceName("api", nil) expect := Status{ - Connected: true, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - LastReceiveSuccess: lastRecvSuccess, - LastReceiveError: lastRecvError, - LastReceiveErrorMessage: lastRecvErrorMsg, + Connected: true, + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + LastReceiveResourceSuccess: lastRecvResourceSuccess, + LastReceiveError: lastRecvError, + LastReceiveErrorMessage: lastRecvErrorMsg, + ImportedServices: map[string]struct{}{ + api.String(): {}, + }, + } + + retry.Run(t, func(r *retry.R) { + status, ok := srv.StreamStatus(peerID) + require.True(r, ok) + require.Equal(r, expect, status) + }) + }) + + var lastReceiveHeartbeat time.Time + testutil.RunStep(t, "receives heartbeat", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{ + Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{}, + }, + } + lastReceiveHeartbeat = it.FutureNow(1) + err := client.Send(resp) + require.NoError(t, err) + api := structs.NewServiceName("api", nil) + + expect := Status{ + Connected: true, + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + LastReceiveResourceSuccess: lastRecvResourceSuccess, + LastReceiveError: lastRecvError, + LastReceiveErrorMessage: lastRecvErrorMsg, + LastReceiveHeartbeat: lastReceiveHeartbeat, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -563,14 +596,16 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { api := structs.NewServiceName("api", nil) expect := Status{ - Connected: false, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - DisconnectTime: disconnectTime, - LastReceiveSuccess: lastRecvSuccess, - LastReceiveError: lastRecvError, - LastReceiveErrorMessage: lastRecvErrorMsg, + Connected: false, + DisconnectErrorMessage: "stream ended unexpectedly", + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + DisconnectTime: disconnectTime, + LastReceiveResourceSuccess: lastRecvResourceSuccess, + LastReceiveError: lastRecvError, + LastReceiveErrorMessage: lastRecvErrorMsg, + LastReceiveHeartbeat: lastReceiveHeartbeat, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -868,8 +903,8 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { }) } -// Test that when the client doesn't send a heartbeat in time, the stream is terminated. -func TestStreamResources_Server_TerminatesOnHeartbeatTimeout(t *testing.T) { +// Test that when the client doesn't send a heartbeat in time, the stream is disconnected. +func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { it := incrementalTime{ base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), } @@ -905,10 +940,13 @@ func TestStreamResources_Server_TerminatesOnHeartbeatTimeout(t *testing.T) { }) testutil.RunStep(t, "stream is disconnected due to heartbeat timeout", func(t *testing.T) { + disconnectTime := it.FutureNow(1) retry.Run(t, func(r *retry.R) { status, ok := srv.StreamStatus(peerID) require.True(r, ok) require.False(r, status.Connected) + require.Equal(r, "heartbeat timeout", status.DisconnectErrorMessage) + require.Equal(r, disconnectTime, status.DisconnectTime) }) }) } diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index c9d41e127c..8632f36eca 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -75,13 +75,23 @@ func (t *Tracker) connectedLocked(id string) (*MutableStatus, error) { return status, nil } -// Disconnected ensures that if a peer id's stream status is tracked, it is marked as disconnected. -func (t *Tracker) Disconnected(id string) { +// DisconnectedGracefully marks the peer id's stream status as disconnected gracefully. +func (t *Tracker) DisconnectedGracefully(id string) { t.mu.Lock() defer t.mu.Unlock() if status, ok := t.streams[id]; ok { - status.TrackDisconnected() + status.TrackDisconnectedGracefully() + } +} + +// DisconnectedDueToError marks the peer id's stream status as disconnected due to an error. +func (t *Tracker) DisconnectedDueToError(id string, error string) { + t.mu.Lock() + defer t.mu.Unlock() + + if status, ok := t.streams[id]; ok { + status.TrackDisconnectedDueToError(error) } } @@ -135,6 +145,10 @@ type Status struct { // Connected is true when there is an open stream for the peer. Connected bool + // DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully. + // If the stream is connected or it disconnected gracefully it will be empty. + DisconnectErrorMessage string + // If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero. DisconnectTime time.Time @@ -153,8 +167,11 @@ type Status struct { // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string - // LastReceiveSuccess tracks the time we last successfully stored a resource replicated FROM the peer. - LastReceiveSuccess time.Time + // LastReceiveHeartbeat tracks when we last received a heartbeat from our peer. + LastReceiveHeartbeat time.Time + + // LastReceiveResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer. + LastReceiveResourceSuccess time.Time // LastReceiveError tracks either: // - The time we failed to store a resource replicated FROM the peer. @@ -208,9 +225,17 @@ func (s *MutableStatus) TrackSendError(error string) { s.mu.Unlock() } -func (s *MutableStatus) TrackReceiveSuccess() { +// TrackReceiveResourceSuccess tracks receiving a replicated resource. +func (s *MutableStatus) TrackReceiveResourceSuccess() { s.mu.Lock() - s.LastReceiveSuccess = s.timeNow().UTC() + s.LastReceiveResourceSuccess = s.timeNow().UTC() + s.mu.Unlock() +} + +// TrackReceiveHeartbeat tracks receiving a heartbeat from our peer. +func (s *MutableStatus) TrackReceiveHeartbeat() { + s.mu.Lock() + s.LastReceiveHeartbeat = s.timeNow().UTC() s.mu.Unlock() } @@ -232,13 +257,27 @@ func (s *MutableStatus) TrackConnected() { s.mu.Lock() s.Connected = true s.DisconnectTime = time.Time{} + s.DisconnectErrorMessage = "" s.mu.Unlock() } -func (s *MutableStatus) TrackDisconnected() { +// TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected. +// For example, we got a terminated message, or we terminated the stream ourselves. +func (s *MutableStatus) TrackDisconnectedGracefully() { s.mu.Lock() s.Connected = false s.DisconnectTime = s.timeNow().UTC() + s.DisconnectErrorMessage = "" + s.mu.Unlock() +} + +// TrackDisconnectedDueToError tracks when the stream was disconnected due to an error. +// For example the heartbeat timed out, or we couldn't send into the stream. +func (s *MutableStatus) TrackDisconnectedDueToError(error string) { + s.mu.Lock() + s.Connected = false + s.DisconnectTime = s.timeNow().UTC() + s.DisconnectErrorMessage = error s.mu.Unlock() } diff --git a/agent/grpc-external/services/peerstream/stream_tracker_test.go b/agent/grpc-external/services/peerstream/stream_tracker_test.go index a698ccc6f4..f7a9df321d 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker_test.go +++ b/agent/grpc-external/services/peerstream/stream_tracker_test.go @@ -62,7 +62,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { }) testutil.RunStep(t, "disconnect", func(t *testing.T) { - tracker.Disconnected(peerID) + tracker.DisconnectedGracefully(peerID) sequence++ expect := Status{ @@ -147,7 +147,7 @@ func TestTracker_connectedStreams(t *testing.T) { require.NoError(t, err) // Mark foo as disconnected to avoid showing it as an active stream - status.TrackDisconnected() + status.TrackDisconnectedGracefully() _, err = s.Connected("bar") require.NoError(t, err) @@ -162,3 +162,61 @@ func TestTracker_connectedStreams(t *testing.T) { }) } } + +func TestMutableStatus_TrackConnected(t *testing.T) { + s := MutableStatus{ + Status: Status{ + Connected: false, + DisconnectTime: time.Now(), + DisconnectErrorMessage: "disconnected", + }, + } + s.TrackConnected() + + require.True(t, s.IsConnected()) + require.True(t, s.Connected) + require.Equal(t, time.Time{}, s.DisconnectTime) + require.Empty(t, s.DisconnectErrorMessage) +} + +func TestMutableStatus_TrackDisconnectedGracefully(t *testing.T) { + it := incrementalTime{ + base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), + } + disconnectTime := it.FutureNow(1) + + s := MutableStatus{ + timeNow: it.Now, + Status: Status{ + Connected: true, + }, + } + + s.TrackDisconnectedGracefully() + + require.False(t, s.IsConnected()) + require.False(t, s.Connected) + require.Equal(t, disconnectTime, s.DisconnectTime) + require.Empty(t, s.DisconnectErrorMessage) +} + +func TestMutableStatus_TrackDisconnectedDueToError(t *testing.T) { + it := incrementalTime{ + base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), + } + disconnectTime := it.FutureNow(1) + + s := MutableStatus{ + timeNow: it.Now, + Status: Status{ + Connected: true, + }, + } + + s.TrackDisconnectedDueToError("disconnect err") + + require.False(t, s.IsConnected()) + require.False(t, s.Connected) + require.Equal(t, disconnectTime, s.DisconnectTime) + require.Equal(t, "disconnect err", s.DisconnectErrorMessage) +} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 72add67b07..9a471e56ac 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -511,6 +511,8 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering // reconcile pbpeering.PeeringState_Active if streamState.Connected { cp.State = pbpeering.PeeringState_ACTIVE + } else if streamState.DisconnectErrorMessage != "" { + cp.State = pbpeering.PeeringState_FAILING } // add imported & exported services counts