Merge pull request #14747 from hashicorp/kisunji/NET-801-add-peer-stream-status

This commit is contained in:
Freddy 2022-10-10 14:07:54 -06:00 committed by GitHub
commit a73c6a26c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 710 additions and 541 deletions

3
.changelog/14747.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
peering: return information about the health of the peering when the leader is queried to read a peering.
```

View File

@ -1144,15 +1144,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ImportedServiceCount))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ImportedServices))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.StreamStatus.ImportedServices))
// on List
resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ImportedServiceCount))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ImportedServices))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].StreamStatus.ImportedServices))
})
// Check that exported services count on S1 are what we expect
@ -1161,15 +1159,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ExportedServiceCount))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ExportedServices))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.StreamStatus.ExportedServices))
// on List
resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ExportedServiceCount))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ExportedServices))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].StreamStatus.ExportedServices))
})
})
}

View File

@ -584,12 +584,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
if req.Peering.State == pbpeering.PeeringState_UNDEFINED {
req.Peering.State = existing.State
}
// TODO(peering): Confirm behavior when /peering/token is called more than once.
// We may need to avoid clobbering existing values.
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
req.Peering.ExportedServiceCount = existing.ExportedServiceCount
req.Peering.ImportedServices = existing.ImportedServices
req.Peering.ExportedServices = existing.ExportedServices
req.Peering.StreamStatus = nil
req.Peering.CreateIndex = existing.CreateIndex
req.Peering.ModifyIndex = idx
} else {

View File

@ -351,8 +351,14 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
err := streamReq.Stream.Send(msg)
sendMutex.Unlock()
if err != nil {
status.TrackSendError(err.Error())
// We only track send successes and errors for response types because this is meant to track
// resources, not request/ack messages.
if msg.GetResponse() != nil {
if err != nil {
status.TrackSendError(err.Error())
} else {
status.TrackSendSuccess()
}
}
return err
}

View File

@ -572,9 +572,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
})
var lastSendAck time.Time
var lastSendSuccess time.Time
client.DrainStream(t)
// Manually grab the last success time from sending the trust bundle or exported services list.
status, ok := srv.StreamStatus(testPeerID)
require.True(t, ok)
lastSendSuccess = status.LastSendSuccess
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
ack := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
@ -589,11 +595,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}
lastSendAck = it.FutureNow(1)
err := client.Send(ack)
require.NoError(t, err)
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
ExportedServices: []string{},
}
@ -631,6 +639,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
@ -682,6 +691,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
@ -737,6 +747,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
@ -766,6 +777,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: true,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,
@ -793,6 +805,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expect := Status{
Connected: false,
DisconnectErrorMessage: lastRecvErrorMsg,
LastSendSuccess: lastSendSuccess,
LastAck: lastSendAck,
LastNack: lastNack,
LastNackMessage: lastNackMsg,

View File

@ -214,6 +214,9 @@ type Status struct {
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastSendSuccess tracks the time we last successfully sent a resource TO the peer.
LastSendSuccess time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat time.Time
@ -271,6 +274,12 @@ func (s *MutableStatus) TrackSendError(error string) {
s.mu.Unlock()
}
func (s *MutableStatus) TrackSendSuccess() {
s.mu.Lock()
s.LastSendSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
// TrackRecvResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackRecvResourceSuccess() {
s.mu.Lock()

View File

@ -375,11 +375,8 @@ func TestHTTP_Peering_Read(t *testing.T) {
require.Equal(t, foo.Peering.Name, apiResp.Name)
require.Equal(t, foo.Peering.Meta, apiResp.Meta)
require.Equal(t, uint64(0), apiResp.ImportedServiceCount)
require.Equal(t, uint64(0), apiResp.ExportedServiceCount)
require.Equal(t, 0, len(apiResp.ImportedServices))
require.Equal(t, 0, len(apiResp.ExportedServices))
require.Equal(t, 0, len(apiResp.StreamStatus.ImportedServices))
require.Equal(t, 0, len(apiResp.StreamStatus.ExportedServices))
})
t.Run("not found", func(t *testing.T) {
@ -507,10 +504,8 @@ func TestHTTP_Peering_List(t *testing.T) {
require.Len(t, apiResp, 2)
for _, p := range apiResp {
require.Equal(t, uint64(0), p.ImportedServiceCount)
require.Equal(t, uint64(0), p.ExportedServiceCount)
require.Equal(t, 0, len(p.ImportedServices))
require.Equal(t, 0, len(p.ExportedServices))
require.Equal(t, 0, len(p.StreamStatus.ImportedServices))
require.Equal(t, 0, len(p.StreamStatus.ExportedServices))
}
})
}

View File

@ -632,8 +632,10 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
streamState, found := s.Tracker.StreamStatus(peering.ID)
if !found {
// TODO(peering): this may be noise on non-leaders
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
" exported services count or reconcile peering state", "peerID", peering.ID)
peering.StreamStatus = &pbpeering.StreamStatus{}
return peering
} else {
cp := copyPeering(peering)
@ -645,11 +647,26 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
cp.State = pbpeering.PeeringState_FAILING
}
// add imported & exported services
cp.ImportedServices = streamState.ImportedServices
cp.ExportedServices = streamState.ExportedServices
cp.ImportedServiceCount = streamState.GetImportedServicesCount()
cp.ExportedServiceCount = streamState.GetExportedServicesCount()
latest := func(tt ...time.Time) time.Time {
latest := time.Time{}
for _, t := range tt {
if t.After(latest) {
latest = t
}
}
return latest
}
lastRecv := latest(streamState.LastRecvHeartbeat, streamState.LastRecvError, streamState.LastRecvResourceSuccess)
lastSend := latest(streamState.LastSendError, streamState.LastSendSuccess)
cp.StreamStatus = &pbpeering.StreamStatus{
ImportedServices: streamState.ImportedServices,
ExportedServices: streamState.ExportedServices,
LastHeartbeat: structs.TimeToProto(streamState.LastRecvHeartbeat),
LastReceive: structs.TimeToProto(lastRecv),
LastSend: structs.TimeToProto(lastSend),
}
return cp
}

View File

@ -62,20 +62,27 @@ type Peering struct {
PeerServerName string `json:",omitempty"`
// PeerServerAddresses contains all the connection addresses for the remote peer.
PeerServerAddresses []string `json:",omitempty"`
// ImportedServiceCount is the count of how many services are imported from this peering.
ImportedServiceCount uint64
// ExportedServiceCount is the count of how many services are exported to this peering.
ExportedServiceCount uint64
// ImportedServices is the list of services imported from this peering.
ImportedServices []string
// ExportedServices is the list of services exported to this peering.
ExportedServices []string
// StreamStatus contains information computed on read based on the state of the stream.
StreamStatus PeeringStreamStatus
// CreateIndex is the Raft index at which the Peering was created.
CreateIndex uint64
// ModifyIndex is the latest Raft index at which the Peering. was modified.
ModifyIndex uint64
}
type PeeringStreamStatus struct {
// ImportedServices is the list of services imported from this peering.
ImportedServices []string
// ExportedServices is the list of services exported to this peering.
ExportedServices []string
// LastHeartbeat represents when the last heartbeat message was received.
LastHeartbeat time.Time
// LastReceive represents when any message was last received, regardless of success or error.
LastReceive time.Time
// LastSend represents when any message was last sent, regardless of success or error.
LastSend time.Time
}
type PeeringReadResponse struct {
Peering *Peering
}

View File

@ -26,10 +26,7 @@ func peerExistsInPeerListings(peer *Peering, peerings []*Peering) bool {
(peer.State == aPeer.State) &&
(peer.CreateIndex == aPeer.CreateIndex) &&
(peer.ModifyIndex == aPeer.ModifyIndex) &&
(peer.ImportedServiceCount == aPeer.ImportedServiceCount) &&
(peer.ExportedServiceCount == aPeer.ExportedServiceCount) &&
reflect.DeepEqual(peer.ImportedServices, aPeer.ImportedServices) &&
reflect.DeepEqual(peer.ExportedServices, aPeer.ExportedServices)
(reflect.DeepEqual(peer.StreamStatus, aPeer.StreamStatus))
if isEqual {
return true

View File

@ -90,6 +90,7 @@ func (c *cmd) Run(args []string) int {
}
result := make([]string, 0, len(list))
// TODO(peering): consider adding more StreamStatus fields here
header := "Name\x1fState\x1fImported Svcs\x1fExported Svcs\x1fMeta"
result = append(result, header)
for _, peer := range list {
@ -99,7 +100,7 @@ func (c *cmd) Run(args []string) int {
}
meta := strings.Join(metaPairs, ",")
line := fmt.Sprintf("%s\x1f%s\x1f%d\x1f%d\x1f%s",
peer.Name, peer.State, len(peer.ImportedServices), len(peer.ExportedServices), meta)
peer.Name, peer.State, len(peer.StreamStatus.ImportedServices), len(peer.StreamStatus.ExportedServices), meta)
result = append(result, line)
}

View File

@ -130,9 +130,12 @@ func formatPeering(peering *api.Peering) string {
}
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("Imported Services: %d\n", len(peering.ImportedServices)))
buffer.WriteString(fmt.Sprintf("Exported Services: %d\n", len(peering.ExportedServices)))
buffer.WriteString(fmt.Sprintf("Imported Services: %d\n", len(peering.StreamStatus.ImportedServices)))
buffer.WriteString(fmt.Sprintf("Exported Services: %d\n", len(peering.StreamStatus.ExportedServices)))
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("Last Heartbeat: %v\n", peering.StreamStatus.LastHeartbeat))
buffer.WriteString(fmt.Sprintf("Last Send: %v\n", peering.StreamStatus.LastSend))
buffer.WriteString(fmt.Sprintf("Last Receive: %v\n", peering.StreamStatus.LastReceive))
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("Create Index: %d\n", peering.CreateIndex))
buffer.WriteString(fmt.Sprintf("Modify Index: %d\n", peering.ModifyIndex))

View File

@ -109,6 +109,9 @@ func TestReadCommand(t *testing.T) {
require.Contains(t, output, "env=production")
require.Contains(t, output, "Imported Services")
require.Contains(t, output, "Exported Services")
require.Contains(t, output, "Last Heartbeat")
require.Contains(t, output, "Last Send")
require.Contains(t, output, "Last Receive")
})
t.Run("read with json", func(t *testing.T) {

View File

@ -76,10 +76,7 @@ func PeeringToAPI(s *Peering, t *api.Peering) {
t.PeerCAPems = s.PeerCAPems
t.PeerServerName = s.PeerServerName
t.PeerServerAddresses = s.PeerServerAddresses
t.ImportedServiceCount = s.ImportedServiceCount
t.ExportedServiceCount = s.ExportedServiceCount
t.ImportedServices = s.ImportedServices
t.ExportedServices = s.ExportedServices
t.StreamStatus = StreamStatusToAPI(s.StreamStatus)
t.CreateIndex = s.CreateIndex
t.ModifyIndex = s.ModifyIndex
}
@ -97,10 +94,7 @@ func PeeringFromAPI(t *api.Peering, s *Peering) {
s.PeerCAPems = t.PeerCAPems
s.PeerServerName = t.PeerServerName
s.PeerServerAddresses = t.PeerServerAddresses
s.ImportedServiceCount = t.ImportedServiceCount
s.ExportedServiceCount = t.ExportedServiceCount
s.ImportedServices = t.ImportedServices
s.ExportedServices = t.ExportedServices
s.StreamStatus = StreamStatusFromAPI(t.StreamStatus)
s.CreateIndex = t.CreateIndex
s.ModifyIndex = t.ModifyIndex
}

View File

@ -142,6 +142,26 @@ func PeeringStateFromAPI(t api.PeeringState) PeeringState {
}
}
func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus {
return api.PeeringStreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: structs.TimeFromProto(status.LastHeartbeat),
LastReceive: structs.TimeFromProto(status.LastReceive),
LastSend: structs.TimeFromProto(status.LastSend),
}
}
func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus {
return &StreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: structs.TimeToProto(status.LastHeartbeat),
LastReceive: structs.TimeToProto(status.LastReceive),
LastSend: structs.TimeToProto(status.LastSend),
}
}
func (p *Peering) IsActive() bool {
if p == nil || p.State == PeeringState_TERMINATED {
return false

View File

@ -97,6 +97,16 @@ func (msg *Peering) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *StreamStatus) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *StreamStatus) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringTrustBundle) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -183,17 +183,10 @@ message Peering {
// PeerServerAddresses contains all the the connection addresses for the remote peer.
repeated string PeerServerAddresses = 10;
// ImportedServiceCount is the count of how many services are imported from this peering.
uint64 ImportedServiceCount = 13;
// ExportedServiceCount is the count of how many services are exported to this peering.
uint64 ExportedServiceCount = 14;
// ImportedServices is the list of services imported from this peering.
repeated string ImportedServices = 15;
// ExportedServices is the list of services exported to this peering.
repeated string ExportedServices = 16;
// StreamStatus contains information computed on read based on the state of the stream.
//
// mog: func-to=StreamStatusToAPI func-from=StreamStatusFromAPI
StreamStatus StreamStatus = 13;
// CreateIndex is the Raft index at which the Peering was created.
// @gotags: bexpr:"-"
@ -204,6 +197,24 @@ message Peering {
uint64 ModifyIndex = 12;
}
// StreamStatus represents information about an active peering stream.
message StreamStatus {
// ImportedServices is the list of services imported from this peering.
repeated string ImportedServices = 1;
// ExportedServices is the list of services exported to this peering.
repeated string ExportedServices = 2;
// LastHeartbeat represents when the last heartbeat message was received.
google.protobuf.Timestamp LastHeartbeat = 3;
// LastReceive represents when any message was last received, regardless of success or error.
google.protobuf.Timestamp LastReceive = 4;
// LastSend represents when any message was last sent, regardless of success or error.
google.protobuf.Timestamp LastSend = 5;
}
// PeeringTrustBundle holds the trust information for validating requests from a peer.
message PeeringTrustBundle {
// TrustDomain is the domain for the bundle, example.com, foo.bar.gov for example. Note that this must not have a prefix such as "spiffe://".