Include stream-related information in peering endpoints

This commit is contained in:
Chris S. Kim 2022-09-23 17:51:41 -04:00 committed by freddygv
parent 7770be3d57
commit b0a4c5c563
16 changed files with 686 additions and 539 deletions

View File

@ -1144,15 +1144,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(r, err) require.NoError(r, err)
require.NotNil(r, resp.Peering) require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ImportedServiceCount)) require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.StreamStatus.ImportedServices))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ImportedServices))
// on List // on List
resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{}) resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2) require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings) require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ImportedServiceCount)) require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].StreamStatus.ImportedServices))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ImportedServices))
}) })
// Check that exported services count on S1 are what we expect // Check that exported services count on S1 are what we expect
@ -1161,15 +1159,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"}) resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
require.NoError(r, err) require.NoError(r, err)
require.NotNil(r, resp.Peering) require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ExportedServiceCount)) require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.StreamStatus.ExportedServices))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ExportedServices))
// on List // on List
resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{}) resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2) require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings) require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ExportedServiceCount)) require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].StreamStatus.ExportedServices))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ExportedServices))
}) })
}) })
} }

View File

@ -584,12 +584,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
if req.Peering.State == pbpeering.PeeringState_UNDEFINED { if req.Peering.State == pbpeering.PeeringState_UNDEFINED {
req.Peering.State = existing.State req.Peering.State = existing.State
} }
// TODO(peering): Confirm behavior when /peering/token is called more than once. req.Peering.StreamStatus = nil
// We may need to avoid clobbering existing values.
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
req.Peering.ExportedServiceCount = existing.ExportedServiceCount
req.Peering.ImportedServices = existing.ImportedServices
req.Peering.ExportedServices = existing.ExportedServices
req.Peering.CreateIndex = existing.CreateIndex req.Peering.CreateIndex = existing.CreateIndex
req.Peering.ModifyIndex = idx req.Peering.ModifyIndex = idx
} else { } else {

View File

@ -353,6 +353,8 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
if err != nil { if err != nil {
status.TrackSendError(err.Error()) status.TrackSendError(err.Error())
} else {
status.TrackSendSuccess()
} }
return err return err
} }

View File

@ -214,6 +214,9 @@ type Status struct {
// LastSendErrorMessage tracks the last error message when sending into the stream. // LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string LastSendErrorMessage string
// LastSendSuccess tracks the time we last successfully sent a resource TO the peer.
LastSendSuccess time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer. // LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat time.Time LastRecvHeartbeat time.Time
@ -271,6 +274,12 @@ func (s *MutableStatus) TrackSendError(error string) {
s.mu.Unlock() s.mu.Unlock()
} }
func (s *MutableStatus) TrackSendSuccess() {
s.mu.Lock()
s.LastSendSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
// TrackRecvResourceSuccess tracks receiving a replicated resource. // TrackRecvResourceSuccess tracks receiving a replicated resource.
func (s *MutableStatus) TrackRecvResourceSuccess() { func (s *MutableStatus) TrackRecvResourceSuccess() {
s.mu.Lock() s.mu.Lock()

View File

@ -375,11 +375,8 @@ func TestHTTP_Peering_Read(t *testing.T) {
require.Equal(t, foo.Peering.Name, apiResp.Name) require.Equal(t, foo.Peering.Name, apiResp.Name)
require.Equal(t, foo.Peering.Meta, apiResp.Meta) require.Equal(t, foo.Peering.Meta, apiResp.Meta)
require.Equal(t, uint64(0), apiResp.ImportedServiceCount) require.Equal(t, 0, len(apiResp.StreamStatus.ImportedServices))
require.Equal(t, uint64(0), apiResp.ExportedServiceCount) require.Equal(t, 0, len(apiResp.StreamStatus.ExportedServices))
require.Equal(t, 0, len(apiResp.ImportedServices))
require.Equal(t, 0, len(apiResp.ExportedServices))
}) })
t.Run("not found", func(t *testing.T) { t.Run("not found", func(t *testing.T) {
@ -507,10 +504,8 @@ func TestHTTP_Peering_List(t *testing.T) {
require.Len(t, apiResp, 2) require.Len(t, apiResp, 2)
for _, p := range apiResp { for _, p := range apiResp {
require.Equal(t, uint64(0), p.ImportedServiceCount) require.Equal(t, 0, len(p.StreamStatus.ImportedServices))
require.Equal(t, uint64(0), p.ExportedServiceCount) require.Equal(t, 0, len(p.StreamStatus.ExportedServices))
require.Equal(t, 0, len(p.ImportedServices))
require.Equal(t, 0, len(p.ExportedServices))
} }
}) })
} }

View File

@ -645,11 +645,26 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
cp.State = pbpeering.PeeringState_FAILING cp.State = pbpeering.PeeringState_FAILING
} }
// add imported & exported services latest := func(tt ...time.Time) time.Time {
cp.ImportedServices = streamState.ImportedServices latest := time.Time{}
cp.ExportedServices = streamState.ExportedServices for _, t := range tt {
cp.ImportedServiceCount = streamState.GetImportedServicesCount() if t.After(latest) {
cp.ExportedServiceCount = streamState.GetExportedServicesCount() latest = t
}
}
return latest
}
lastRecv := latest(streamState.LastRecvHeartbeat, streamState.LastRecvError, streamState.LastRecvResourceSuccess)
lastSend := latest(streamState.LastSendError, streamState.LastSendSuccess)
cp.StreamStatus = &pbpeering.StreamStatus{
ImportedServices: streamState.ImportedServices,
ExportedServices: streamState.ExportedServices,
LastHeartbeat: structs.TimeToProto(streamState.LastRecvHeartbeat),
LastReceive: structs.TimeToProto(lastRecv),
LastSend: structs.TimeToProto(lastSend),
}
return cp return cp
} }

View File

@ -62,20 +62,27 @@ type Peering struct {
PeerServerName string `json:",omitempty"` PeerServerName string `json:",omitempty"`
// PeerServerAddresses contains all the connection addresses for the remote peer. // PeerServerAddresses contains all the connection addresses for the remote peer.
PeerServerAddresses []string `json:",omitempty"` PeerServerAddresses []string `json:",omitempty"`
// ImportedServiceCount is the count of how many services are imported from this peering. // StreamStatus contains information computed on read based on the state of the stream.
ImportedServiceCount uint64 StreamStatus PeeringStreamStatus
// ExportedServiceCount is the count of how many services are exported to this peering.
ExportedServiceCount uint64
// ImportedServices is the list of services imported from this peering.
ImportedServices []string
// ExportedServices is the list of services exported to this peering.
ExportedServices []string
// CreateIndex is the Raft index at which the Peering was created. // CreateIndex is the Raft index at which the Peering was created.
CreateIndex uint64 CreateIndex uint64
// ModifyIndex is the latest Raft index at which the Peering. was modified. // ModifyIndex is the latest Raft index at which the Peering. was modified.
ModifyIndex uint64 ModifyIndex uint64
} }
type PeeringStreamStatus struct {
// ImportedServices is the list of services imported from this peering.
ImportedServices []string
// ExportedServices is the list of services exported to this peering.
ExportedServices []string
// LastHeartbeat represents when the last heartbeat message was received.
LastHeartbeat time.Time
// LastReceive represents when any message was last received, regardless of success or error.
LastReceive time.Time
// LastSend represents when any message was last sent, regardless of success or error.
LastSend time.Time
}
type PeeringReadResponse struct { type PeeringReadResponse struct {
Peering *Peering Peering *Peering
} }

View File

@ -26,10 +26,7 @@ func peerExistsInPeerListings(peer *Peering, peerings []*Peering) bool {
(peer.State == aPeer.State) && (peer.State == aPeer.State) &&
(peer.CreateIndex == aPeer.CreateIndex) && (peer.CreateIndex == aPeer.CreateIndex) &&
(peer.ModifyIndex == aPeer.ModifyIndex) && (peer.ModifyIndex == aPeer.ModifyIndex) &&
(peer.ImportedServiceCount == aPeer.ImportedServiceCount) && (reflect.DeepEqual(peer.StreamStatus, aPeer.StreamStatus))
(peer.ExportedServiceCount == aPeer.ExportedServiceCount) &&
reflect.DeepEqual(peer.ImportedServices, aPeer.ImportedServices) &&
reflect.DeepEqual(peer.ExportedServices, aPeer.ExportedServices)
if isEqual { if isEqual {
return true return true

View File

@ -90,6 +90,7 @@ func (c *cmd) Run(args []string) int {
} }
result := make([]string, 0, len(list)) result := make([]string, 0, len(list))
// TODO(peering): consider adding more StreamStatus fields here
header := "Name\x1fState\x1fImported Svcs\x1fExported Svcs\x1fMeta" header := "Name\x1fState\x1fImported Svcs\x1fExported Svcs\x1fMeta"
result = append(result, header) result = append(result, header)
for _, peer := range list { for _, peer := range list {
@ -99,7 +100,7 @@ func (c *cmd) Run(args []string) int {
} }
meta := strings.Join(metaPairs, ",") meta := strings.Join(metaPairs, ",")
line := fmt.Sprintf("%s\x1f%s\x1f%d\x1f%d\x1f%s", line := fmt.Sprintf("%s\x1f%s\x1f%d\x1f%d\x1f%s",
peer.Name, peer.State, len(peer.ImportedServices), len(peer.ExportedServices), meta) peer.Name, peer.State, len(peer.StreamStatus.ImportedServices), len(peer.StreamStatus.ExportedServices), meta)
result = append(result, line) result = append(result, line)
} }

View File

@ -130,9 +130,12 @@ func formatPeering(peering *api.Peering) string {
} }
buffer.WriteString("\n") buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("Imported Services: %d\n", len(peering.ImportedServices))) buffer.WriteString(fmt.Sprintf("Imported Services: %d\n", len(peering.StreamStatus.ImportedServices)))
buffer.WriteString(fmt.Sprintf("Exported Services: %d\n", len(peering.ExportedServices))) buffer.WriteString(fmt.Sprintf("Exported Services: %d\n", len(peering.StreamStatus.ExportedServices)))
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("Last Heartbeat: %v\n", peering.StreamStatus.LastHeartbeat))
buffer.WriteString(fmt.Sprintf("Last Send: %v\n", peering.StreamStatus.LastSend))
buffer.WriteString(fmt.Sprintf("Last Receive: %v\n", peering.StreamStatus.LastReceive))
buffer.WriteString("\n") buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf("Create Index: %d\n", peering.CreateIndex)) buffer.WriteString(fmt.Sprintf("Create Index: %d\n", peering.CreateIndex))
buffer.WriteString(fmt.Sprintf("Modify Index: %d\n", peering.ModifyIndex)) buffer.WriteString(fmt.Sprintf("Modify Index: %d\n", peering.ModifyIndex))

View File

@ -109,6 +109,9 @@ func TestReadCommand(t *testing.T) {
require.Contains(t, output, "env=production") require.Contains(t, output, "env=production")
require.Contains(t, output, "Imported Services") require.Contains(t, output, "Imported Services")
require.Contains(t, output, "Exported Services") require.Contains(t, output, "Exported Services")
require.Contains(t, output, "Last Heartbeat")
require.Contains(t, output, "Last Send")
require.Contains(t, output, "Last Receive")
}) })
t.Run("read with json", func(t *testing.T) { t.Run("read with json", func(t *testing.T) {

View File

@ -76,10 +76,7 @@ func PeeringToAPI(s *Peering, t *api.Peering) {
t.PeerCAPems = s.PeerCAPems t.PeerCAPems = s.PeerCAPems
t.PeerServerName = s.PeerServerName t.PeerServerName = s.PeerServerName
t.PeerServerAddresses = s.PeerServerAddresses t.PeerServerAddresses = s.PeerServerAddresses
t.ImportedServiceCount = s.ImportedServiceCount t.StreamStatus = StreamStatusToAPI(s.StreamStatus)
t.ExportedServiceCount = s.ExportedServiceCount
t.ImportedServices = s.ImportedServices
t.ExportedServices = s.ExportedServices
t.CreateIndex = s.CreateIndex t.CreateIndex = s.CreateIndex
t.ModifyIndex = s.ModifyIndex t.ModifyIndex = s.ModifyIndex
} }
@ -97,10 +94,7 @@ func PeeringFromAPI(t *api.Peering, s *Peering) {
s.PeerCAPems = t.PeerCAPems s.PeerCAPems = t.PeerCAPems
s.PeerServerName = t.PeerServerName s.PeerServerName = t.PeerServerName
s.PeerServerAddresses = t.PeerServerAddresses s.PeerServerAddresses = t.PeerServerAddresses
s.ImportedServiceCount = t.ImportedServiceCount s.StreamStatus = StreamStatusFromAPI(t.StreamStatus)
s.ExportedServiceCount = t.ExportedServiceCount
s.ImportedServices = t.ImportedServices
s.ExportedServices = t.ExportedServices
s.CreateIndex = t.CreateIndex s.CreateIndex = t.CreateIndex
s.ModifyIndex = t.ModifyIndex s.ModifyIndex = t.ModifyIndex
} }

View File

@ -142,6 +142,26 @@ func PeeringStateFromAPI(t api.PeeringState) PeeringState {
} }
} }
func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus {
return api.PeeringStreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: structs.TimeFromProto(status.LastHeartbeat),
LastReceive: structs.TimeFromProto(status.LastReceive),
LastSend: structs.TimeFromProto(status.LastSend),
}
}
func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus {
return &StreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: structs.TimeToProto(status.LastHeartbeat),
LastReceive: structs.TimeToProto(status.LastReceive),
LastSend: structs.TimeToProto(status.LastSend),
}
}
func (p *Peering) IsActive() bool { func (p *Peering) IsActive() bool {
if p == nil || p.State == PeeringState_TERMINATED { if p == nil || p.State == PeeringState_TERMINATED {
return false return false

View File

@ -97,6 +97,16 @@ func (msg *Peering) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *StreamStatus) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *StreamStatus) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringTrustBundle) MarshalBinary() ([]byte, error) { func (msg *PeeringTrustBundle) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -183,17 +183,10 @@ message Peering {
// PeerServerAddresses contains all the the connection addresses for the remote peer. // PeerServerAddresses contains all the the connection addresses for the remote peer.
repeated string PeerServerAddresses = 10; repeated string PeerServerAddresses = 10;
// ImportedServiceCount is the count of how many services are imported from this peering. // StreamStatus contains information computed on read based on the state of the stream.
uint64 ImportedServiceCount = 13; //
// mog: func-to=StreamStatusToAPI func-from=StreamStatusFromAPI
// ExportedServiceCount is the count of how many services are exported to this peering. StreamStatus StreamStatus = 13;
uint64 ExportedServiceCount = 14;
// ImportedServices is the list of services imported from this peering.
repeated string ImportedServices = 15;
// ExportedServices is the list of services exported to this peering.
repeated string ExportedServices = 16;
// CreateIndex is the Raft index at which the Peering was created. // CreateIndex is the Raft index at which the Peering was created.
// @gotags: bexpr:"-" // @gotags: bexpr:"-"
@ -204,6 +197,24 @@ message Peering {
uint64 ModifyIndex = 12; uint64 ModifyIndex = 12;
} }
// StreamStatus represents information about an active peering stream.
message StreamStatus {
// ImportedServices is the list of services imported from this peering.
repeated string ImportedServices = 1;
// ExportedServices is the list of services exported to this peering.
repeated string ExportedServices = 2;
// LastHeartbeat represents when the last heartbeat message was received.
google.protobuf.Timestamp LastHeartbeat = 3;
// LastReceive represents when any message was last received, regardless of success or error.
google.protobuf.Timestamp LastReceive = 4;
// LastSend represents when any message was last sent, regardless of success or error.
google.protobuf.Timestamp LastSend = 5;
}
// PeeringTrustBundle holds the trust information for validating requests from a peer. // PeeringTrustBundle holds the trust information for validating requests from a peer.
message PeeringTrustBundle { message PeeringTrustBundle {
// TrustDomain is the domain for the bundle, example.com, foo.bar.gov for example. Note that this must not have a prefix such as "spiffe://". // TrustDomain is the domain for the bundle, example.com, foo.bar.gov for example. Note that this must not have a prefix such as "spiffe://".