From 8742fbe14fd1738d438f2d3793022ac32c3c51fa Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Fri, 7 Oct 2022 16:45:49 -0500 Subject: [PATCH] Prevent consul peer-exports by discovery chain. --- agent/consul/internal_endpoint.go | 4 +- agent/consul/leader_peering_test.go | 8 ++-- agent/consul/state/peering.go | 37 +++++++++++-------- agent/consul/state/peering_test.go | 14 +++---- .../services/peerstream/stream_resources.go | 1 - .../peerstream/subscription_manager.go | 2 - .../proxycfg-glue/exported_peered_services.go | 2 +- .../exported_peered_services_test.go | 3 +- agent/proxycfg-glue/glue.go | 2 +- 9 files changed, 38 insertions(+), 35 deletions(-) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 6d43fc8f96..85a36dff01 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -632,7 +632,7 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.EnterpriseMeta) + index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.Datacenter, args.EnterpriseMeta) if err != nil { return err } @@ -678,7 +678,7 @@ func (m *Internal) ExportedServicesForPeer(args *structs.ServiceDumpRequest, rep reply.Services = nil return errNotFound } - idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, "") + idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, args.Datacenter) if err != nil { return fmt.Errorf("error while listing exported services for peer %q: %w", args.PeerName, err) } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 55d3e4370a..b6b70bd22e 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -1139,8 +1139,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { }, }, }, - expectedImportedServsCount: 4, // 3 services from above + the "consul" service - expectedExportedServsCount: 4, // 3 services from above + the "consul" service + expectedImportedServsCount: 3, // 3 services from above + expectedExportedServsCount: 3, // 3 services from above }, { name: "no sync", @@ -1148,8 +1148,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { exportedService: structs.ExportedServicesConfigEntry{ Name: "default", }, - expectedImportedServsCount: 0, // we want to see this decremented from 4 --> 0 - expectedExportedServsCount: 0, // we want to see this decremented from 4 --> 0 + expectedImportedServsCount: 0, // we want to see this decremented from 3 --> 0 + expectedExportedServsCount: 0, // we want to see this decremented from 3 --> 0 }, { name: "just a, b services", diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index bec2f24df1..be2aa1c73a 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -722,7 +722,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc str return exportedServicesForPeerTxn(ws, tx, peering, dc) } -func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { +func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { tx := s.db.ReadTxn() defer tx.Abort() @@ -733,7 +733,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl out := make(map[string]structs.ServiceList) for _, peering := range peerings { - idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, "") + idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, dc) if err != nil { return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err) } @@ -761,6 +761,11 @@ func exportedServicesForPeerTxn( peering *pbpeering.Peering, dc string, ) (uint64, *structs.ExportedServiceList, error) { + // The DC must be specified in order to compile discovery chains. + if dc == "" { + return 0, nil, fmt.Errorf("datacenter cannot be empty") + } + maxIdx := peering.ModifyIndex entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition) @@ -838,10 +843,6 @@ func exportedServicesForPeerTxn( maxIdx = idx } for _, sn := range discoChains { - // Prevent exporting the "consul" service. - if sn.Name == structs.ConsulServiceName { - continue - } discoSet[sn] = struct{}{} } } @@ -868,18 +869,24 @@ func exportedServicesForPeerTxn( } info.Protocol = protocol - if dc != "" && !structs.IsProtocolHTTPLike(protocol) { - // We only need to populate the targets for replication purposes for L4 protocols, which - // do not ultimately get intercepted by the mesh gateways. - idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta) - if err != nil { - return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err) - } + idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err) + } + if idx > maxIdx { + maxIdx = idx + } - if idx > maxIdx { - maxIdx = idx + // Prevent the consul service from being exported by a discovery chain. + for _, t := range targets { + if t.Service == structs.ConsulServiceName { + return nil } + } + // We only need to populate the targets for replication purposes for L4 protocols, which + // do not ultimately get intercepted by the mesh gateways. + if !structs.IsProtocolHTTPLike(protocol) { sort.Slice(targets, func(i, j int) bool { return targets[i].ID < targets[j].ID }) diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index f4e2584a90..4751af23f8 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -1820,6 +1820,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ ID: "billing", Service: "billing", Port: 5000, })) + lastIdx++ // The consul service should never be exported. require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: 8000, @@ -1877,12 +1878,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { Service: "payments-proxy", Port: 5000, })) + lastIdx++ // The consul service should never be exported. require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, - ID: structs.ConsulServiceID, + ID: structs.ConsulServiceID + "-2", Service: structs.ConsulServiceName, - Port: 8000, + Port: 8001, })) // Ensure everything is L7-capable. @@ -1941,9 +1943,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { // NOTE: no consul here }, DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ - newSN("consul-redirect"): { - Protocol: "http", - }, + // NOTE: no consul-redirect here newSN("billing"): { Protocol: "http", }, @@ -1987,9 +1987,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { // NOTE: no consul here }, DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ - newSN("consul-redirect"): { - Protocol: "http", - }, + // NOTE: no consul-redirect here newSN("payments"): { Protocol: "http", }, diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 2188e3d0b2..143b236380 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -646,7 +646,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { var resp *pbpeerstream.ReplicationMessage_Response switch { case strings.HasPrefix(update.CorrelationID, subExportedServiceList): - // TODO either filter here or at the source. resp, err = makeExportedServiceListResponse(status, update) if err != nil { // Log the error and skip this response to avoid locking up peering due to a bad update event. diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index 8bbdd21989..0f2e4bf79c 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -89,8 +89,6 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p state.publicUpdateCh = publicUpdateCh state.updateCh = updateCh - // TODO should I worry about all these goroutines emitting a `consul` object? - // Wrap our bare state store queries in goroutines that emit events. go m.notifyExportedServicesForPeerID(ctx, state, peerID) go m.notifyServerAddrUpdates(ctx, state.updateCh) diff --git a/agent/proxycfg-glue/exported_peered_services.go b/agent/proxycfg-glue/exported_peered_services.go index cc9bed717c..9b555dae2a 100644 --- a/agent/proxycfg-glue/exported_peered_services.go +++ b/agent/proxycfg-glue/exported_peered_services.go @@ -40,7 +40,7 @@ func (s *serverExportedPeeredServices) Notify(ctx context.Context, req *structs. return 0, nil, err } - index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.EnterpriseMeta) + index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.Datacenter, req.EnterpriseMeta) if err != nil { return 0, nil, err } diff --git a/agent/proxycfg-glue/exported_peered_services_test.go b/agent/proxycfg-glue/exported_peered_services_test.go index e9285d357b..8ba7390fbf 100644 --- a/agent/proxycfg-glue/exported_peered_services_test.go +++ b/agent/proxycfg-glue/exported_peered_services_test.go @@ -19,6 +19,7 @@ func TestServerExportedPeeredServices(t *testing.T) { t.Cleanup(cancel) store := state.NewStateStore(nil) + require.NoError(t, store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"})) for _, peer := range []string{"peer-1", "peer-2", "peer-3"} { require.NoError(t, store.PeeringWrite(nextIndex(), &pbpeering.PeeringWriteRequest{ @@ -59,7 +60,7 @@ func TestServerExportedPeeredServices(t *testing.T) { GetStore: func() Store { return store }, ACLResolver: newStaticResolver(authz), }) - require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{}, "", eventCh)) + require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{Datacenter: "dc1"}, "", eventCh)) testutil.RunStep(t, "initial state", func(t *testing.T) { result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh) diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 039988d8ce..a188a0a852 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -36,7 +36,7 @@ type ServerDataSourceDeps struct { type Store interface { watch.StateStore - ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) + ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error) IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)