diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 76d5184d95..bec2f24df1 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -788,6 +788,10 @@ func exportedServicesForPeerTxn( // - use connect native mode for _, svc := range conf.Services { + // Prevent exporting the "consul" service. + if svc.Name == structs.ConsulServiceName { + continue + } svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace) sawPeer := false @@ -818,6 +822,10 @@ func exportedServicesForPeerTxn( maxIdx = idx } for _, s := range typicalServices { + // Prevent exporting the "consul" service. + if s.Service.Name == structs.ConsulServiceName { + continue + } normalSet[s.Service] = struct{}{} } @@ -830,6 +838,10 @@ func exportedServicesForPeerTxn( maxIdx = idx } for _, sn := range discoChains { + // Prevent exporting the "consul" service. + if sn.Name == structs.ConsulServiceName { + continue + } discoSet[sn] = struct{}{} } } diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index 3babaee674..f4e2584a90 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -1745,6 +1745,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { entry := &structs.ExportedServicesConfigEntry{ Name: "default", Services: []structs.ExportedService{ + { + // The "consul" service should never be exported. + Name: structs.ConsulServiceName, + Consumers: []structs.ServiceConsumer{ + {Peer: "my-peering"}, + }, + }, { Name: "mysql", Consumers: []structs.ServiceConsumer{ @@ -1813,6 +1820,10 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ ID: "billing", Service: "billing", Port: 5000, })) + // The consul service should never be exported. + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ + ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: 8000, + })) entry := &structs.ExportedServicesConfigEntry{ Name: "default", @@ -1866,6 +1877,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { Service: "payments-proxy", Port: 5000, })) + // The consul service should never be exported. + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: structs.ConsulServiceID, + Service: structs.ConsulServiceName, + Port: 8000, + })) // Ensure everything is L7-capable. ensureConfigEntry(t, &structs.ProxyConfigEntry{ @@ -1896,6 +1914,16 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }) + // Consul should still never be exported, even if a resolver references it. + ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "consul-redirect", + Redirect: &structs.ServiceResolverRedirect{ + Service: structs.ConsulServiceName, + }, + EnterpriseMeta: *defaultEntMeta, + }) + require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() @@ -1910,8 +1938,12 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, // NOTE: no payments-proxy here + // NOTE: no consul here }, DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("consul-redirect"): { + Protocol: "http", + }, newSN("billing"): { Protocol: "http", }, @@ -1952,8 +1984,12 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, // NOTE: no payments-proxy here + // NOTE: no consul here }, DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("consul-redirect"): { + Protocol: "http", + }, newSN("payments"): { Protocol: "http", }, diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 143b236380..2188e3d0b2 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -646,6 +646,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { var resp *pbpeerstream.ReplicationMessage_Response switch { case strings.HasPrefix(update.CorrelationID, subExportedServiceList): + // TODO either filter here or at the source. resp, err = makeExportedServiceListResponse(status, update) if err != nil { // Log the error and skip this response to avoid locking up peering due to a bad update event. diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index 0f2e4bf79c..8bbdd21989 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -89,6 +89,8 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p state.publicUpdateCh = publicUpdateCh state.updateCh = updateCh + // TODO should I worry about all these goroutines emitting a `consul` object? + // Wrap our bare state store queries in goroutines that emit events. go m.notifyExportedServicesForPeerID(ctx, state, peerID) go m.notifyServerAddrUpdates(ctx, state.updateCh)