peering: read endpoints can now return failing status (#13849)

Track streams that have been disconnected due to an error and
set their statuses to failing.
This commit is contained in:
Luke Kysow 2022-07-25 14:27:53 -07:00 committed by GitHub
parent 93de25f87c
commit 3530d3782d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 185 additions and 39 deletions

View File

@ -160,9 +160,19 @@ func (s *Server) DrainStream(req HandleStreamRequest) {
}
}
func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
if err := s.realHandleStream(streamReq); err != nil {
s.Tracker.DisconnectedDueToError(streamReq.LocalID, err.Error())
return err
}
// TODO(peering) Also need to clear subscriptions associated with the peer
s.Tracker.DisconnectedGracefully(streamReq.LocalID)
return nil
}
// The localID provided is the locally-generated identifier for the peering.
// The remoteID is an identifier that the remote peer recognizes for the peering.
func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// TODO: pass logger down from caller?
logger := s.Logger.Named("stream").
With("peer_name", streamReq.PeerName).
@ -175,9 +185,6 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
return fmt.Errorf("failed to register stream: %v", err)
}
// TODO(peering) Also need to clear subscriptions associated with the peer
defer s.Tracker.Disconnected(streamReq.LocalID)
var trustDomain string
if s.ConnectEnabled {
// Read the TrustDomain up front - we do not allow users to change the ClusterID
@ -454,7 +461,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
status.TrackReceiveError(err.Error())
} else {
status.TrackReceiveSuccess()
status.TrackReceiveResourceSuccess()
}
if err := streamSend(reply); err != nil {
@ -475,6 +482,8 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
}
if msg.GetHeartbeat() != nil {
status.TrackReceiveHeartbeat()
// Reset the heartbeat timeout by creating a new context.
// We first must cancel the old context so there's no leaks. This is safe to do because we're only
// reading that context within this for{} loop, and so we won't accidentally trigger the heartbeat

View File

@ -423,7 +423,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
})
var lastRecvSuccess time.Time
var lastRecvResourceSuccess time.Time
testutil.RunStep(t, "response applied locally", func(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
@ -437,7 +437,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
},
}
lastRecvSuccess = it.FutureNow(1)
lastRecvResourceSuccess = it.FutureNow(1)
err := client.Send(resp)
require.NoError(t, err)
@ -475,11 +475,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
api := structs.NewServiceName("api", nil)
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastReceiveSuccess: lastRecvSuccess,
Connected: true,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastReceiveResourceSuccess: lastRecvResourceSuccess,
ImportedServices: map[string]struct{}{
api.String(): {},
},
@ -534,13 +534,46 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
api := structs.NewServiceName("api", nil)
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastReceiveSuccess: lastRecvSuccess,
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
Connected: true,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastReceiveResourceSuccess: lastRecvResourceSuccess,
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
ImportedServices: map[string]struct{}{
api.String(): {},
},
}
retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(peerID)
require.True(r, ok)
require.Equal(r, expect, status)
})
})
var lastReceiveHeartbeat time.Time
testutil.RunStep(t, "receives heartbeat", func(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{
Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{},
},
}
lastReceiveHeartbeat = it.FutureNow(1)
err := client.Send(resp)
require.NoError(t, err)
api := structs.NewServiceName("api", nil)
expect := Status{
Connected: true,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
LastReceiveResourceSuccess: lastRecvResourceSuccess,
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
LastReceiveHeartbeat: lastReceiveHeartbeat,
ImportedServices: map[string]struct{}{
api.String(): {},
},
@ -563,14 +596,16 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
api := structs.NewServiceName("api", nil)
expect := Status{
Connected: false,
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime,
LastReceiveSuccess: lastRecvSuccess,
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
Connected: false,
DisconnectErrorMessage: "stream ended unexpectedly",
LastAck: lastSendSuccess,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime,
LastReceiveResourceSuccess: lastRecvResourceSuccess,
LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
LastReceiveHeartbeat: lastReceiveHeartbeat,
ImportedServices: map[string]struct{}{
api.String(): {},
},
@ -868,8 +903,8 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
})
}
// Test that when the client doesn't send a heartbeat in time, the stream is terminated.
func TestStreamResources_Server_TerminatesOnHeartbeatTimeout(t *testing.T) {
// Test that when the client doesn't send a heartbeat in time, the stream is disconnected.
func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
@ -905,10 +940,13 @@ func TestStreamResources_Server_TerminatesOnHeartbeatTimeout(t *testing.T) {
})
testutil.RunStep(t, "stream is disconnected due to heartbeat timeout", func(t *testing.T) {
disconnectTime := it.FutureNow(1)
retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(peerID)
require.True(r, ok)
require.False(r, status.Connected)
require.Equal(r, "heartbeat timeout", status.DisconnectErrorMessage)
require.Equal(r, disconnectTime, status.DisconnectTime)
})
})
}

View File

@ -75,13 +75,23 @@ func (t *Tracker) connectedLocked(id string) (*MutableStatus, error) {
return status, nil
}
// Disconnected ensures that if a peer id's stream status is tracked, it is marked as disconnected.
func (t *Tracker) Disconnected(id string) {
// DisconnectedGracefully marks the peer id's stream status as disconnected gracefully.
func (t *Tracker) DisconnectedGracefully(id string) {
t.mu.Lock()
defer t.mu.Unlock()
if status, ok := t.streams[id]; ok {
status.TrackDisconnected()
status.TrackDisconnectedGracefully()
}
}
// DisconnectedDueToError marks the peer id's stream status as disconnected due to an error.
func (t *Tracker) DisconnectedDueToError(id string, error string) {
t.mu.Lock()
defer t.mu.Unlock()
if status, ok := t.streams[id]; ok {
status.TrackDisconnectedDueToError(error)
}
}
@ -135,6 +145,10 @@ type Status struct {
// Connected is true when there is an open stream for the peer.
Connected bool
// DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully.
// If the stream is connected or it disconnected gracefully it will be empty.
DisconnectErrorMessage string
// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
DisconnectTime time.Time
@ -153,8 +167,11 @@ type Status struct {
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastReceiveSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
LastReceiveSuccess time.Time
// LastReceiveHeartbeat tracks when we last received a heartbeat from our peer.
LastReceiveHeartbeat time.Time
// LastReceiveResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
LastReceiveResourceSuccess time.Time
// LastReceiveError tracks either:
// - The time we failed to store a resource replicated FROM the peer.
@ -208,9 +225,17 @@ func (s *MutableStatus) TrackSendError(error string) {
s.mu.Unlock()
}
func (s *MutableStatus) TrackReceiveSuccess() {
// TrackReceiveResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackReceiveResourceSuccess() {
s.mu.Lock()
s.LastReceiveSuccess = s.timeNow().UTC()
s.LastReceiveResourceSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
// TrackReceiveHeartbeat tracks receiving a heartbeat from our peer.
func (s *MutableStatus) TrackReceiveHeartbeat() {
s.mu.Lock()
s.LastReceiveHeartbeat = s.timeNow().UTC()
s.mu.Unlock()
}
@ -232,13 +257,27 @@ func (s *MutableStatus) TrackConnected() {
s.mu.Lock()
s.Connected = true
s.DisconnectTime = time.Time{}
s.DisconnectErrorMessage = ""
s.mu.Unlock()
}
func (s *MutableStatus) TrackDisconnected() {
// TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected.
// For example, we got a terminated message, or we terminated the stream ourselves.
func (s *MutableStatus) TrackDisconnectedGracefully() {
s.mu.Lock()
s.Connected = false
s.DisconnectTime = s.timeNow().UTC()
s.DisconnectErrorMessage = ""
s.mu.Unlock()
}
// TrackDisconnectedDueToError tracks when the stream was disconnected due to an error.
// For example the heartbeat timed out, or we couldn't send into the stream.
func (s *MutableStatus) TrackDisconnectedDueToError(error string) {
s.mu.Lock()
s.Connected = false
s.DisconnectTime = s.timeNow().UTC()
s.DisconnectErrorMessage = error
s.mu.Unlock()
}

View File

@ -62,7 +62,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) {
})
testutil.RunStep(t, "disconnect", func(t *testing.T) {
tracker.Disconnected(peerID)
tracker.DisconnectedGracefully(peerID)
sequence++
expect := Status{
@ -147,7 +147,7 @@ func TestTracker_connectedStreams(t *testing.T) {
require.NoError(t, err)
// Mark foo as disconnected to avoid showing it as an active stream
status.TrackDisconnected()
status.TrackDisconnectedGracefully()
_, err = s.Connected("bar")
require.NoError(t, err)
@ -162,3 +162,61 @@ func TestTracker_connectedStreams(t *testing.T) {
})
}
}
func TestMutableStatus_TrackConnected(t *testing.T) {
s := MutableStatus{
Status: Status{
Connected: false,
DisconnectTime: time.Now(),
DisconnectErrorMessage: "disconnected",
},
}
s.TrackConnected()
require.True(t, s.IsConnected())
require.True(t, s.Connected)
require.Equal(t, time.Time{}, s.DisconnectTime)
require.Empty(t, s.DisconnectErrorMessage)
}
func TestMutableStatus_TrackDisconnectedGracefully(t *testing.T) {
it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
disconnectTime := it.FutureNow(1)
s := MutableStatus{
timeNow: it.Now,
Status: Status{
Connected: true,
},
}
s.TrackDisconnectedGracefully()
require.False(t, s.IsConnected())
require.False(t, s.Connected)
require.Equal(t, disconnectTime, s.DisconnectTime)
require.Empty(t, s.DisconnectErrorMessage)
}
func TestMutableStatus_TrackDisconnectedDueToError(t *testing.T) {
it := incrementalTime{
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
}
disconnectTime := it.FutureNow(1)
s := MutableStatus{
timeNow: it.Now,
Status: Status{
Connected: true,
},
}
s.TrackDisconnectedDueToError("disconnect err")
require.False(t, s.IsConnected())
require.False(t, s.Connected)
require.Equal(t, disconnectTime, s.DisconnectTime)
require.Equal(t, "disconnect err", s.DisconnectErrorMessage)
}

View File

@ -511,6 +511,8 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
// reconcile pbpeering.PeeringState_Active
if streamState.Connected {
cp.State = pbpeering.PeeringState_ACTIVE
} else if streamState.DisconnectErrorMessage != "" {
cp.State = pbpeering.PeeringState_FAILING
}
// add imported & exported services counts