Prevent the "consul" service from being exported.

This commit is contained in:
Derek Menteer 2022-10-07 11:43:11 -05:00 committed by Derek Menteer
parent 3872a36d93
commit f366edcb8d
4 changed files with 51 additions and 0 deletions

View File

@ -788,6 +788,10 @@ func exportedServicesForPeerTxn(
// - use connect native mode // - use connect native mode
for _, svc := range conf.Services { for _, svc := range conf.Services {
// Prevent exporting the "consul" service.
if svc.Name == structs.ConsulServiceName {
continue
}
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace) svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
sawPeer := false sawPeer := false
@ -818,6 +822,10 @@ func exportedServicesForPeerTxn(
maxIdx = idx maxIdx = idx
} }
for _, s := range typicalServices { for _, s := range typicalServices {
// Prevent exporting the "consul" service.
if s.Service.Name == structs.ConsulServiceName {
continue
}
normalSet[s.Service] = struct{}{} normalSet[s.Service] = struct{}{}
} }
@ -830,6 +838,10 @@ func exportedServicesForPeerTxn(
maxIdx = idx maxIdx = idx
} }
for _, sn := range discoChains { for _, sn := range discoChains {
// Prevent exporting the "consul" service.
if sn.Name == structs.ConsulServiceName {
continue
}
discoSet[sn] = struct{}{} discoSet[sn] = struct{}{}
} }
} }

View File

@ -1745,6 +1745,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{ entry := &structs.ExportedServicesConfigEntry{
Name: "default", Name: "default",
Services: []structs.ExportedService{ Services: []structs.ExportedService{
{
// The "consul" service should never be exported.
Name: structs.ConsulServiceName,
Consumers: []structs.ServiceConsumer{
{Peer: "my-peering"},
},
},
{ {
Name: "mysql", Name: "mysql",
Consumers: []structs.ServiceConsumer{ Consumers: []structs.ServiceConsumer{
@ -1813,6 +1820,10 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
ID: "billing", Service: "billing", Port: 5000, ID: "billing", Service: "billing", Port: 5000,
})) }))
// The consul service should never be exported.
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: 8000,
}))
entry := &structs.ExportedServicesConfigEntry{ entry := &structs.ExportedServicesConfigEntry{
Name: "default", Name: "default",
@ -1866,6 +1877,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
Service: "payments-proxy", Service: "payments-proxy",
Port: 5000, Port: 5000,
})) }))
// The consul service should never be exported.
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Port: 8000,
}))
// Ensure everything is L7-capable. // Ensure everything is L7-capable.
ensureConfigEntry(t, &structs.ProxyConfigEntry{ ensureConfigEntry(t, &structs.ProxyConfigEntry{
@ -1896,6 +1914,16 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}) })
// Consul should still never be exported, even if a resolver references it.
ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "consul-redirect",
Redirect: &structs.ServiceResolverRedirect{
Service: structs.ConsulServiceName,
},
EnterpriseMeta: *defaultEntMeta,
})
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
@ -1910,8 +1938,12 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
// NOTE: no payments-proxy here // NOTE: no payments-proxy here
// NOTE: no consul here
}, },
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("consul-redirect"): {
Protocol: "http",
},
newSN("billing"): { newSN("billing"): {
Protocol: "http", Protocol: "http",
}, },
@ -1952,8 +1984,12 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
// NOTE: no payments-proxy here // NOTE: no payments-proxy here
// NOTE: no consul here
}, },
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("consul-redirect"): {
Protocol: "http",
},
newSN("payments"): { newSN("payments"): {
Protocol: "http", Protocol: "http",
}, },

View File

@ -646,6 +646,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
var resp *pbpeerstream.ReplicationMessage_Response var resp *pbpeerstream.ReplicationMessage_Response
switch { switch {
case strings.HasPrefix(update.CorrelationID, subExportedServiceList): case strings.HasPrefix(update.CorrelationID, subExportedServiceList):
// TODO either filter here or at the source.
resp, err = makeExportedServiceListResponse(status, update) resp, err = makeExportedServiceListResponse(status, update)
if err != nil { if err != nil {
// Log the error and skip this response to avoid locking up peering due to a bad update event. // Log the error and skip this response to avoid locking up peering due to a bad update event.

View File

@ -89,6 +89,8 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p
state.publicUpdateCh = publicUpdateCh state.publicUpdateCh = publicUpdateCh
state.updateCh = updateCh state.updateCh = updateCh
// TODO should I worry about all these goroutines emitting a `consul` object?
// Wrap our bare state store queries in goroutines that emit events. // Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID) go m.notifyExportedServicesForPeerID(ctx, state, peerID)
go m.notifyServerAddrUpdates(ctx, state.updateCh) go m.notifyServerAddrUpdates(ctx, state.updateCh)