diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index aa720bd6b5..c21156b429 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -616,7 +616,7 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId } // TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe -func TestLeader_Peering_ImportedServicesCount(t *testing.T) { +func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -747,10 +747,10 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { /// finished adding services type testCase struct { - name string - description string - exportedService structs.ExportedServicesConfigEntry - expectedImportedServicesCount uint64 + name string + description string + exportedService structs.ExportedServicesConfigEntry + expectedImportedExportedServicesCount uint64 // same count for a server that imports the services form a server that exports them } testCases := []testCase{ @@ -770,7 +770,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { }, }, }, - expectedImportedServicesCount: 4, // 3 services from above + the "consul" service + expectedImportedExportedServicesCount: 4, // 3 services from above + the "consul" service }, { name: "no sync", @@ -778,7 +778,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { exportedService: structs.ExportedServicesConfigEntry{ Name: "default", }, - expectedImportedServicesCount: 0, // we want to see this decremented from 4 --> 0 + expectedImportedExportedServicesCount: 0, // we want to see this decremented from 4 --> 0 }, { name: "just a, b services", @@ -804,7 +804,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { }, }, }, - expectedImportedServicesCount: 2, + expectedImportedExportedServicesCount: 2, }, { name: "unexport b service", @@ -822,7 +822,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { }, }, }, - expectedImportedServicesCount: 1, + expectedImportedExportedServicesCount: 1, }, { name: "export c service", @@ -848,7 +848,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { }, }, }, - expectedImportedServicesCount: 2, + expectedImportedExportedServicesCount: 2, }, } @@ -866,11 +866,34 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) { lastIdx++ require.NoError(t, s1.fsm.State().EnsureConfigEntry(lastIdx, &tc.exportedService)) + // Check that imported services count on S2 are what we expect retry.Run(t, func(r *retry.R) { - resp2, err := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + // on Read + resp, err := peeringClient2.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) require.NoError(r, err) + require.NotNil(r, resp.Peering) + require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ImportedServiceCount) + + // on List + resp2, err2 := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + require.NoError(r, err2) require.NotEmpty(r, resp2.Peerings) - require.Equal(r, tc.expectedImportedServicesCount, resp2.Peerings[0].ImportedServiceCount) + require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ImportedServiceCount) + }) + + // Check that exported services count on S1 are what we expect + retry.Run(t, func(r *retry.R) { + // on Read + resp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s2"}) + require.NoError(r, err) + require.NotNil(r, resp.Peering) + require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ExportedServiceCount) + + // on List + resp2, err2 := peeringClient.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + require.NoError(r, err2) + require.NotEmpty(r, resp2.Peerings) + require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ExportedServiceCount) }) }) } diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index c69d705d3a..938605cac4 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -36,10 +36,14 @@ import ( // If there are no instances in the event, we consider that to be a de-registration. func makeServiceResponse( logger hclog.Logger, + mst *MutableStatus, update cache.UpdateEvent, ) (*pbpeerstream.ReplicationMessage_Response, error) { + serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) + sn := structs.ServiceNameFromString(serviceName) csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) if !ok { + logger.Error("did not increment or decrement exported services count", "service_name", serviceName) return nil, fmt.Errorf("invalid type for service response: %T", update.Result) } @@ -51,9 +55,6 @@ func makeServiceResponse( if err != nil { return nil, fmt.Errorf("failed to marshal: %w", err) } - - serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) - // If no nodes are present then it's due to one of: // 1. The service is newly registered or exported and yielded a transient empty update. // 2. All instances of the service were de-registered. @@ -61,7 +62,10 @@ func makeServiceResponse( // // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. // Case #1 is a no-op for the importing peer. - if len(export.Nodes) == 0 { + if len(csn.Nodes) == 0 { + logger.Trace("decrementing exported services count", "service_name", sn.String()) + mst.RemoveExportedService(sn) + return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, // TODO(peering): Nonce management @@ -71,6 +75,9 @@ func makeServiceResponse( }, nil } + logger.Trace("incrementing exported services count", "service_name", sn.String()) + mst.TrackExportedService(sn) + // If there are nodes in the response, we push them as an UPSERT operation. return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 26d5a7b004..c67f7da041 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -424,7 +424,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { var resp *pbpeerstream.ReplicationMessage_Response switch { case strings.HasPrefix(update.CorrelationID, subExportedService): - resp, err = makeServiceResponse(logger, update) + resp, err = makeServiceResponse(logger, status, update) if err != nil { // Log the error and skip this response to avoid locking up peering due to a bad update event. logger.Error("failed to create service response", "error", err) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 1e3117ecc5..d5f9e2c36c 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -22,6 +22,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" @@ -1006,6 +1007,53 @@ func (b *testStreamBackend) CatalogDeregister(req *structs.DeregisterRequest) er return nil } +func Test_makeServiceResponse_ExportedServicesCount(t *testing.T) { + peerName := "billing" + peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" + + srv, store := newTestServer(t, nil) + require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{ + ID: peerID, + Name: peerName}, + )) + + // connect the stream + mst, err := srv.Tracker.Connected(peerID) + require.NoError(t, err) + + testutil.RunStep(t, "simulate an update to export a service", func(t *testing.T) { + update := cache.UpdateEvent{ + CorrelationID: subExportedService + "api", + Result: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + PeerName: peerName, + }, + }, + }, + }} + _, err := makeServiceResponse(srv.Logger, mst, update) + require.NoError(t, err) + + require.Equal(t, 1, mst.GetExportedServicesCount()) + }) + + testutil.RunStep(t, "simulate a delete for an exported service", func(t *testing.T) { + update := cache.UpdateEvent{ + CorrelationID: subExportedService + "api", + Result: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{}, + }} + _, err := makeServiceResponse(srv.Logger, mst, update) + require.NoError(t, err) + + require.Equal(t, 0, mst.GetExportedServicesCount()) + }) +} + func Test_processResponse_Validation(t *testing.T) { peerName := "billing" peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index 4244bbe09c..40d8ecb632 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -166,9 +166,11 @@ type Status struct { // - The last error message when receiving from the stream. LastReceiveErrorMessage string - // TODO(peering): consider keeping track of imported service counts thru raft - // ImportedServices is set that keeps track of which service names are imported for the peer + // TODO(peering): consider keeping track of imported and exported services thru raft + // ImportedServices keeps track of which service names are imported for the peer ImportedServices map[string]struct{} + // ExportedServices keeps track of which service names a peer asks to export + ExportedServices map[string]struct{} } func newMutableStatus(now func() time.Time, connected bool) *MutableStatus { @@ -274,3 +276,28 @@ func (s *MutableStatus) GetImportedServicesCount() int { return len(s.ImportedServices) } + +func (s *MutableStatus) RemoveExportedService(sn structs.ServiceName) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.ExportedServices, sn.String()) +} + +func (s *MutableStatus) TrackExportedService(sn structs.ServiceName) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.ExportedServices == nil { + s.ExportedServices = make(map[string]struct{}) + } + + s.ExportedServices[sn.String()] = struct{}{} +} + +func (s *MutableStatus) GetExportedServicesCount() int { + s.mu.RLock() + defer s.mu.RUnlock() + + return len(s.ExportedServices) +} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 47e39c2b7b..845818eb22 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -346,6 +346,7 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID) } else { cp.ImportedServiceCount = uint64(len(st.ImportedServices)) + cp.ExportedServiceCount = uint64(len(st.ExportedServices)) } return &pbpeering.PeeringReadResponse{Peering: cp}, nil @@ -386,6 +387,7 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID) } else { cp.ImportedServiceCount = uint64(len(st.ImportedServices)) + cp.ExportedServiceCount = uint64(len(st.ExportedServices)) } cPeerings = append(cPeerings, cp)