From de5a991d8c0e2857963fd2ac80b776b19127fcf7 Mon Sep 17 00:00:00 2001 From: alex <8968914+acpana@users.noreply.github.com> Date: Tue, 19 Jul 2022 11:43:29 -0700 Subject: [PATCH] peering: refactor reconcile, cleanup (#13795) Signed-off-by: acpana <8968914+acpana@users.noreply.github.com> --- agent/consul/leader_peering_test.go | 32 +++++--- .../services/peerstream/replication.go | 6 -- .../services/peerstream/stream_tracker.go | 8 ++ agent/rpc/peering/service.go | 80 +++++++------------ api/peering_test.go | 4 +- 5 files changed, 59 insertions(+), 71 deletions(-) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index c21156b429..c3196a54ec 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -747,10 +747,11 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { /// finished adding services type testCase struct { - name string - description string - exportedService structs.ExportedServicesConfigEntry - expectedImportedExportedServicesCount uint64 // same count for a server that imports the services form a server that exports them + name string + description string + exportedService structs.ExportedServicesConfigEntry + expectedImportedServsCount uint64 + expectedExportedServsCount uint64 } testCases := []testCase{ @@ -770,7 +771,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { }, }, }, - expectedImportedExportedServicesCount: 4, // 3 services from above + the "consul" service + expectedImportedServsCount: 4, // 3 services from above + the "consul" service + expectedExportedServsCount: 4, // 3 services from above + the "consul" service }, { name: "no sync", @@ -778,7 +780,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { exportedService: structs.ExportedServicesConfigEntry{ Name: "default", }, - expectedImportedExportedServicesCount: 0, // we want to see this decremented from 4 --> 0 + expectedImportedServsCount: 0, // we want to see this decremented from 4 --> 0 + expectedExportedServsCount: 0, // we want to see this decremented from 4 --> 0 }, { name: "just a, b services", @@ -804,7 +807,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { }, }, }, - expectedImportedExportedServicesCount: 2, + expectedImportedServsCount: 2, + expectedExportedServsCount: 2, }, { name: "unexport b service", @@ -822,7 +826,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { }, }, }, - expectedImportedExportedServicesCount: 1, + expectedImportedServsCount: 1, + expectedExportedServsCount: 1, }, { name: "export c service", @@ -848,7 +853,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { }, }, }, - expectedImportedExportedServicesCount: 2, + expectedImportedServsCount: 2, + expectedExportedServsCount: 2, }, } @@ -872,13 +878,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { resp, err := peeringClient2.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) require.NoError(r, err) require.NotNil(r, resp.Peering) - require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ImportedServiceCount) + require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ImportedServiceCount) // on List resp2, err2 := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{}) require.NoError(r, err2) require.NotEmpty(r, resp2.Peerings) - require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ImportedServiceCount) + require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ImportedServiceCount) }) // Check that exported services count on S1 are what we expect @@ -887,13 +893,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { resp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s2"}) require.NoError(r, err) require.NotNil(r, resp.Peering) - require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ExportedServiceCount) + require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ExportedServiceCount) // on List resp2, err2 := peeringClient.PeeringList(ctx, &pbpeering.PeeringListRequest{}) require.NoError(r, err2) require.NotEmpty(r, resp2.Peerings) - require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ExportedServiceCount) + require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ExportedServiceCount) }) }) } diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 938605cac4..be79a23bd5 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -43,7 +43,6 @@ func makeServiceResponse( sn := structs.ServiceNameFromString(serviceName) csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) if !ok { - logger.Error("did not increment or decrement exported services count", "service_name", serviceName) return nil, fmt.Errorf("invalid type for service response: %T", update.Result) } @@ -63,7 +62,6 @@ func makeServiceResponse( // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. // Case #1 is a no-op for the importing peer. if len(csn.Nodes) == 0 { - logger.Trace("decrementing exported services count", "service_name", sn.String()) mst.RemoveExportedService(sn) return &pbpeerstream.ReplicationMessage_Response{ @@ -75,7 +73,6 @@ func makeServiceResponse( }, nil } - logger.Trace("incrementing exported services count", "service_name", sn.String()) mst.TrackExportedService(sn) // If there are nodes in the response, we push them as an UPSERT operation. @@ -220,7 +217,6 @@ func (s *Server) handleUpsert( return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err) } - logger.Trace("incrementing imported services count", "service_name", sn.String()) mutableStatus.TrackImportedService(sn) return nil @@ -468,11 +464,9 @@ func (s *Server) handleDelete( err := s.handleUpdateService(peerName, partition, sn, nil) if err != nil { - logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err) return err } - logger.Trace("decrementing imported services count", "service_name", sn.String()) mutableStatus.RemoveImportedService(sn) return nil diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index 40d8ecb632..c9d41e127c 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -173,6 +173,14 @@ type Status struct { ExportedServices map[string]struct{} } +func (s *Status) GetImportedServicesCount() uint64 { + return uint64(len(s.ImportedServices)) +} + +func (s *Status) GetExportedServicesCount() uint64 { + return uint64(len(s.ExportedServices)) +} + func newMutableStatus(now func() time.Time, connected bool) *MutableStatus { return &MutableStatus{ Status: Status{ diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 845818eb22..4b7d051bce 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -338,17 +339,7 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ return &pbpeering.PeeringReadResponse{Peering: nil}, nil } - cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State)) - - // add imported services count - st, found := s.Tracker.StreamStatus(peering.ID) - if !found { - s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID) - } else { - cp.ImportedServiceCount = uint64(len(st.ImportedServices)) - cp.ExportedServiceCount = uint64(len(st.ExportedServices)) - } - + cp := s.reconcilePeering(peering) return &pbpeering.PeeringReadResponse{Peering: cp}, nil } @@ -379,34 +370,38 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ // reconcile the actual peering state; need to copy over the ds for peering var cPeerings []*pbpeering.Peering for _, p := range peerings { - cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State)) - - // add imported services count - st, found := s.Tracker.StreamStatus(p.ID) - if !found { - s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID) - } else { - cp.ImportedServiceCount = uint64(len(st.ImportedServices)) - cp.ExportedServiceCount = uint64(len(st.ExportedServices)) - } - + cp := s.reconcilePeering(p) cPeerings = append(cPeerings, cp) } + return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil } -// TODO(peering): Maybe get rid of this when actually monitoring the stream health -// reconciledStreamStateHint peaks into the streamTracker and determines whether a peering should be marked -// as PeeringState.Active or not -func (s *Server) reconciledStreamStateHint(pID string, pState pbpeering.PeeringState) pbpeering.PeeringState { - streamState, found := s.Tracker.StreamStatus(pID) +// TODO(peering): Get rid of this func when we stop using the stream tracker for imported/ exported services and the peering state +// reconcilePeering enriches the peering with the following information: +// -- PeeringState.Active if the peering is active +// -- ImportedServicesCount and ExportedServicesCount +// NOTE: we return a new peering with this additional data +func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering { + streamState, found := s.Tracker.StreamStatus(peering.ID) + if !found { + s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+ + " exported services count or reconcile peering state", "peerID", peering.ID) + return peering + } else { + cp := copyPeering(peering) - if found && streamState.Connected { - return pbpeering.PeeringState_ACTIVE + // reconcile pbpeering.PeeringState_Active + if streamState.Connected { + cp.State = pbpeering.PeeringState_ACTIVE + } + + // add imported & exported services counts + cp.ImportedServiceCount = streamState.GetImportedServicesCount() + cp.ExportedServiceCount = streamState.GetExportedServicesCount() + + return cp } - - // default, no reconciliation - return pState } // TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. @@ -605,22 +600,9 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin return id, nil } -func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering { - return &pbpeering.Peering{ - ID: p.ID, - Name: p.Name, - Partition: p.Partition, - DeletedAt: p.DeletedAt, - Meta: p.Meta, - PeerID: p.PeerID, - PeerCAPems: p.PeerCAPems, - PeerServerAddresses: p.PeerServerAddresses, - PeerServerName: p.PeerServerName, - CreateIndex: p.CreateIndex, - ModifyIndex: p.ModifyIndex, - ImportedServiceCount: p.ImportedServiceCount, - ExportedServiceCount: p.ExportedServiceCount, +func copyPeering(p *pbpeering.Peering) *pbpeering.Peering { + var copyP pbpeering.Peering + proto.Merge(©P, p) - State: state, - } + return ©P } diff --git a/api/peering_test.go b/api/peering_test.go index 1c022a9cf2..fcd7c5b3ce 100644 --- a/api/peering_test.go +++ b/api/peering_test.go @@ -152,9 +152,7 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { }) defer s2.Stop() - testutil.RunStep(t, "register services to get synced dc2", func(t *testing.T) { - testNodeServiceCheckRegistrations(t, c2, "dc2") - }) + testNodeServiceCheckRegistrations(t, c2, "dc2") ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()