Fix issue with peer services incorrectly appearing as connect-enabled. (#16339)

Prior to this commit, all peer services were transmitted as connect-enabled
as long as a one or more mesh-gateways were healthy. With this change, there
is now a difference between typical services and connect services transmitted
via peering.

A service will be reported as "connect-enabled" as long as any of these
conditions are met:

1. a connect-proxy sidecar is registered for the service name.
2. a connect-native instance of the service is registered.
3. a service resolver / splitter / router is registered for the service name.
4. a terminating gateway has registered the service.
This commit is contained in:
Derek Menteer 2023-02-21 13:59:36 -06:00 committed by GitHub
parent 7f9ec78932
commit ad865f549b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 448 additions and 116 deletions

3
.changelog/16339.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
peering: Fix bug where services were incorrectly imported as connect-enabled.
```

View File

@ -900,12 +900,17 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if svc.PeerName == "" && sn.Name != "" {
if err := upsertKindServiceName(tx, idx, structs.ServiceKindConnectEnabled, sn); err != nil {
return fmt.Errorf("failed to persist service name as connect-enabled: %v", err)
}
}
// Update the virtual IP for the service
supported, err := virtualIPsSupported(tx, nil)
if err != nil {
return err
}
// Update the virtual IP for the service
if supported {
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
vip, err := assignServiceVirtualIP(tx, idx, psn)
@ -1964,6 +1969,24 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
}
// Cleanup ConnectEnabled for this service if none exist.
if svc.PeerName == "" && (svc.ServiceKind == structs.ServiceKindConnectProxy || svc.ServiceConnect.Native) {
service := svc.ServiceName
if svc.ServiceKind == structs.ServiceKindConnectProxy {
service = svc.ServiceProxy.DestinationServiceName
}
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
connectEnabled, err := serviceHasConnectEnabledInstances(tx, sn.Name, &sn.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to search for connect instances for service %q: %w", sn.Name, err)
}
if !connectEnabled {
if err := cleanupKindServiceName(tx, idx, sn, structs.ServiceKindConnectEnabled); err != nil {
return fmt.Errorf("failed to cleanup connect-enabled service name: %v", err)
}
}
}
if svc.PeerName == "" {
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil {
@ -3731,6 +3754,27 @@ func serviceHasConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.En
return hasConnectInstance, hasNonConnectInstance, nil
}
// serviceHasConnectEnabledInstances returns whether the given service name
// has a corresponding connect-proxy or connect-native instance.
// This function is mostly a clone of `serviceHasConnectInstances`, but it has
// an early return to improve performance and returns true if at least one
// connect-native instance exists.
func serviceHasConnectEnabledInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, error) {
query := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
svc, err := tx.First(tableServices, indexConnect, query)
if err != nil {
return false, fmt.Errorf("failed service lookup: %w", err)
}
if svc != nil {
return true, nil
}
return false, nil
}
// updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {

View File

@ -8664,7 +8664,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
},
}
var idx uint64
var idx, connectEnabledIdx uint64
testRegisterNode(t, s, idx, "node1")
for _, svc := range services {
@ -8678,7 +8678,28 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
require.Len(t, gotNames, 1)
require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service)
require.Equal(t, svc.Kind, gotNames[0].Kind)
if svc.Kind == structs.ServiceKindConnectProxy {
connectEnabledIdx = idx
}
}
// A ConnectEnabled service should exist if a corresponding ConnectProxy or ConnectNative service exists.
verifyConnectEnabled := func(expectIdx uint64) {
gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindConnectEnabled)
require.NoError(t, err)
require.Equal(t, expectIdx, gotIdx)
require.Equal(t, []*KindServiceName{
{
Kind: structs.ServiceKindConnectEnabled,
Service: structs.NewServiceName("foo", entMeta),
RaftIndex: structs.RaftIndex{
CreateIndex: connectEnabledIdx,
ModifyIndex: connectEnabledIdx,
},
},
}, gotNames)
}
verifyConnectEnabled(connectEnabledIdx)
// Register another ingress gateway and there should be two names under the kind index
newIngress := structs.NodeService{
@ -8749,6 +8770,38 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Empty(t, got)
// A ConnectEnabled entry should not be removed until all corresponding services are removed.
{
verifyConnectEnabled(connectEnabledIdx)
// Add a connect-native service.
idx++
require.NoError(t, s.EnsureService(idx, "node1", &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "foo",
Service: "foo",
Address: "5.5.5.5",
Port: 5555,
EnterpriseMeta: *entMeta,
Connect: structs.ServiceConnect{
Native: true,
},
}))
verifyConnectEnabled(connectEnabledIdx)
// Delete the proxy. This should not clean up the entry, because we still have a
// connect-native service registered.
idx++
require.NoError(t, s.DeleteService(idx, "node1", "connect-proxy", entMeta, ""))
verifyConnectEnabled(connectEnabledIdx)
// Remove the connect-native service to clear out the connect-enabled entry.
require.NoError(t, s.DeleteService(idx, "node1", "foo", entMeta, ""))
gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindConnectEnabled)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Empty(t, gotNames)
}
}
func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) {

View File

@ -770,73 +770,88 @@ func exportedServicesForPeerTxn(
maxIdx := peering.ModifyIndex
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
idx, conf, err := getExportedServicesConfigEntryTxn(tx, ws, nil, entMeta)
idx, exportConf, err := getExportedServicesConfigEntryTxn(tx, ws, nil, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch exported-services config entry: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
if conf == nil {
if exportConf == nil {
return maxIdx, &structs.ExportedServiceList{}, nil
}
var (
normalSet = make(map[structs.ServiceName]struct{})
discoSet = make(map[structs.ServiceName]struct{})
// exportedServices will contain the listing of all service names that are being exported
// and will need to be queried for connect / discovery chain information.
exportedServices = make(map[structs.ServiceName]struct{})
// exportedConnectServices will contain the listing of all connect service names that are being exported.
exportedConnectServices = make(map[structs.ServiceName]struct{})
// namespaceConnectServices provides a listing of all connect service names for a particular partition+namespace pair.
namespaceConnectServices = make(map[acl.EnterpriseMeta]map[string]struct{})
// namespaceDiscoChains provides a listing of all disco chain names for a particular partition+namespace pair.
namespaceDiscoChains = make(map[acl.EnterpriseMeta]map[string]struct{})
)
// At least one of the following should be true for a name for it to
// replicate:
//
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
// - have an explicit sidecar kind=connect-proxy
// - use connect native mode
// Helper function for inserting data and auto-creating maps.
insertEntry := func(m map[acl.EnterpriseMeta]map[string]struct{}, entMeta acl.EnterpriseMeta, name string) {
names, ok := m[entMeta]
if !ok {
names = make(map[string]struct{})
m[entMeta] = names
}
names[name] = struct{}{}
}
for _, svc := range conf.Services {
// Build the set of all services that will be exported.
// Any possible namespace wildcards or "consul" services should be removed by this step.
for _, svc := range exportConf.Services {
// Prevent exporting the "consul" service.
if svc.Name == structs.ConsulServiceName {
continue
}
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
svcEntMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
svcName := structs.NewServiceName(svc.Name, &svcEntMeta)
sawPeer := false
peerFound := false
for _, consumer := range svc.Consumers {
name := structs.NewServiceName(svc.Name, &svcMeta)
if _, ok := normalSet[name]; ok {
// Service was covered by a wildcard that was already accounted for
if consumer.Peer == peering.Name {
peerFound = true
break
}
}
// Only look for more information if the matching peer was found.
if !peerFound {
continue
}
if consumer.Peer != peering.Name {
continue
}
sawPeer = true
// If this isn't a wildcard, we can simply add it to the list of services to watch and move to the next entry.
if svc.Name != structs.WildcardSpecifier {
normalSet[name] = struct{}{}
}
exportedServices[svcName] = struct{}{}
continue
}
// If the target peer is a consumer, and all services in the namespace are exported, query those service names.
if sawPeer && svc.Name == structs.WildcardSpecifier {
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
// If all services in the namespace are exported by the wildcard, query those service names.
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcEntMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get typical service names: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, s := range typicalServices {
for _, sn := range typicalServices {
// Prevent exporting the "consul" service.
if s.Service.Name == structs.ConsulServiceName {
continue
if sn.Service.Name != structs.ConsulServiceName {
exportedServices[sn.Service] = struct{}{}
}
normalSet[s.Service] = struct{}{}
}
// list all config entries of kind service-resolver, service-router, service-splitter?
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, nil, svcMeta)
// List all config entries of kind service-resolver, service-router, service-splitter, because they
// will be exported as connect services.
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, nil, svcEntMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err)
}
@ -844,14 +859,92 @@ func exportedServicesForPeerTxn(
maxIdx = idx
}
for _, sn := range discoChains {
discoSet[sn] = struct{}{}
// Prevent exporting the "consul" service.
if sn.Name != structs.ConsulServiceName {
exportedConnectServices[sn] = struct{}{}
insertEntry(namespaceDiscoChains, svcEntMeta, sn.Name)
}
}
}
normal := maps.SliceOfKeys(normalSet)
disco := maps.SliceOfKeys(discoSet)
// At least one of the following should be true for a name to replicate it as a *connect* service:
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
// - have an explicit sidecar kind=connect-proxy
// - use connect native mode
// - are registered with a terminating gateway
populateConnectService := func(sn structs.ServiceName) error {
// Load all disco-chains in this namespace if we haven't already.
if _, ok := namespaceDiscoChains[sn.EnterpriseMeta]; !ok {
// Check to see if we have a discovery chain with the same name.
idx, chains, err := listDiscoveryChainNamesTxn(tx, ws, nil, sn.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to get connect services: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, sn := range chains {
insertEntry(namespaceDiscoChains, sn.EnterpriseMeta, sn.Name)
}
}
// Check to see if we have the connect service.
if _, ok := namespaceDiscoChains[sn.EnterpriseMeta][sn.Name]; ok {
exportedConnectServices[sn] = struct{}{}
// Do not early return because we have multiple watches that should be established.
}
// Load all services in this namespace if we haven't already.
if _, ok := namespaceConnectServices[sn.EnterpriseMeta]; !ok {
// This is more efficient than querying the service instance table.
idx, connectServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectEnabled, sn.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to get connect services: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, ksn := range connectServices {
insertEntry(namespaceConnectServices, sn.EnterpriseMeta, ksn.Service.Name)
}
}
// Check to see if we have the connect service.
if _, ok := namespaceConnectServices[sn.EnterpriseMeta][sn.Name]; ok {
exportedConnectServices[sn] = struct{}{}
// Do not early return because we have multiple watches that should be established.
}
// Check if the service is exposed via terminating gateways.
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %w", sn.Name, err)
}
ws.Add(svcGateways.WatchCh())
for svc := svcGateways.Next(); svc != nil; svc = svcGateways.Next() {
gs, ok := svc.(*structs.GatewayService)
if !ok {
return fmt.Errorf("failed converting to GatewayService for %q", sn.Name)
}
if gs.GatewayKind == structs.ServiceKindTerminatingGateway {
exportedConnectServices[sn] = struct{}{}
break
}
}
return nil
}
// Perform queries and check if each service is connect-enabled.
for sn := range exportedServices {
// Do not query for data if we already know it's a connect service.
if _, ok := exportedConnectServices[sn]; ok {
continue
}
if err := populateConnectService(sn); err != nil {
return 0, nil, err
}
}
// Fetch the protocol / targets for connect services.
chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo)
populateChainInfo := func(svc structs.ServiceName) error {
if _, ok := chainInfo[svc]; ok {
@ -899,21 +992,17 @@ func exportedServicesForPeerTxn(
return nil
}
for _, svc := range normal {
if err := populateChainInfo(svc); err != nil {
return 0, nil, err
}
}
for _, svc := range disco {
for svc := range exportedConnectServices {
if err := populateChainInfo(svc); err != nil {
return 0, nil, err
}
}
structs.ServiceList(normal).Sort()
sortedServices := maps.SliceOfKeys(exportedServices)
structs.ServiceList(sortedServices).Sort()
list := &structs.ExportedServiceList{
Services: normal,
Services: sortedServices,
DiscoChains: chainInfo,
}

View File

@ -1908,18 +1908,28 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
},
},
{
// Should be exported as both a normal and disco chain (resolver).
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{Peer: "my-peering"},
},
},
{
// Should be exported as both a normal and disco chain (connect-proxy).
Name: "redis",
Consumers: []structs.ServiceConsumer{
{Peer: "my-peering"},
},
},
{
// Should only be exported as a normal service.
Name: "prometheus",
Consumers: []structs.ServiceConsumer{
{Peer: "my-peering"},
},
},
{
// Should not be exported (different peer consumer)
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{Peer: "my-other-peering"},
@ -1932,12 +1942,37 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
// Register extra things so that disco chain entries appear.
lastIdx++
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{
Node: "node1", Address: "10.0.0.1",
}))
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "node1", &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-sidecar-proxy",
Service: "redis-sidecar-proxy",
Port: 5005,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "redis",
},
}))
ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mysql",
EnterpriseMeta: *defaultEntMeta,
})
expect := &structs.ExportedServiceList{
Services: []structs.ServiceName{
{
Name: "mysql",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "prometheus",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "redis",
EnterpriseMeta: *defaultEntMeta,
@ -1998,17 +2033,21 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
ws = memdb.NewWatchSet()
expect := &structs.ExportedServiceList{
// Only "billing" shows up, because there are no other service instances running,
// and "consul" is never exported.
Services: []structs.ServiceName{
{
Name: "billing",
EnterpriseMeta: *defaultEntMeta,
},
},
// Only "mysql" appears because there it has a service resolver.
// "redis" does not appear, because it's a sidecar proxy without a corresponding service, so the wildcard doesn't find it.
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("billing"): {
newSN("mysql"): {
Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("billing", "", "dc1"),
newTarget("mysql", "", "dc1"),
},
},
},
@ -2025,13 +2064,17 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
ID: "payments", Service: "payments", Port: 5000,
}))
// The proxy will be ignored.
// The proxy will cause "payments" to be output in the disco chains. It will NOT be output
// in the normal services list.
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "payments-proxy",
Service: "payments-proxy",
Port: 5000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "payments",
},
}))
lastIdx++
// The consul service should never be exported.
@ -2099,10 +2142,11 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
},
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
// NOTE: no consul-redirect here
newSN("billing"): {
// NOTE: no billing here, because it does not have a proxy.
newSN("payments"): {
Protocol: "http",
},
newSN("payments"): {
newSN("mysql"): {
Protocol: "http",
},
newSN("resolver"): {
@ -2129,6 +2173,9 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
lastIdx++
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil))
lastIdx++
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceResolver, "mysql", nil))
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
@ -2160,6 +2207,51 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.Equal(t, expect, got)
})
testutil.RunStep(t, "terminating gateway services are exported", func(t *testing.T) {
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
ID: "term-svc", Service: "term-svc", Port: 6000,
}))
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "some-terminating-gateway",
ID: "some-terminating-gateway",
Port: 9000,
}))
lastIdx++
require.NoError(t, s.EnsureConfigEntry(lastIdx, &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "some-terminating-gateway",
Services: []structs.LinkedService{{Name: "term-svc"}},
}))
expect := &structs.ExportedServiceList{
Services: []structs.ServiceName{
newSN("payments"),
newSN("term-svc"),
},
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("payments"): {
Protocol: "http",
},
newSN("resolver"): {
Protocol: "http",
},
newSN("router"): {
Protocol: "http",
},
newSN("term-svc"): {
Protocol: "http",
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
})
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
expect := &structs.ExportedServiceList{}

View File

@ -844,6 +844,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
}
mysqlSidecar := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "mysql-sidecar-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "mysql",
},
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
@ -851,6 +858,9 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysqlSidecar))
mongoSvcDefaults := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mongo",
@ -870,6 +880,24 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String()
)
testutil.RunStep(t, "initial stream data is received", func(t *testing.T) {
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
// Roots tested in TestStreamResources_Server_CARootUpdates
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var exportedServices pbpeerstream.ExportedServiceList
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&exportedServices))
require.ElementsMatch(t, []string{}, exportedServices.Services)
},
)
})
testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
@ -895,10 +923,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
// Roots tested in TestStreamResources_Server_CARootUpdates
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// no mongo instances exist
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
@ -909,16 +933,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 0)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 0)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
@ -938,17 +952,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 0)
},
// This event happens because this is the first test case and there are
// no exported services when replication is initially set up.
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var exportedServices pbpeerstream.ExportedServiceList
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&exportedServices))
require.ElementsMatch(t, []string{}, exportedServices.Services)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
@ -976,23 +979,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, store.EnsureService(lastIdx, "mgw", gateway.Service))
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1)
pm := nodes.Nodes[0].Service.Connect.PeerMeta
require.Equal(t, "grpc", pm.Protocol)
spiffeIDs := []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo",
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
}
require.Equal(t, spiffeIDs, pm.SpiffeID)
},
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
@ -1013,6 +999,33 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
)
})
testutil.RunStep(t, "register service resolver to send proxy updates", func(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mongo",
}))
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbpeerstream.ExportedService
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1)
pm := nodes.Nodes[0].Service.Connect.PeerMeta
require.Equal(t, "grpc", pm.Protocol)
spiffeIDs := []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo",
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
}
require.Equal(t, spiffeIDs, pm.SpiffeID)
},
)
})
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},

View File

@ -143,7 +143,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
pending := &pendingPayload{}
m.syncNormalServices(ctx, state, evt.Services)
if m.config.ConnectEnabled {
m.syncDiscoveryChains(state, pending, evt.ListAllDiscoveryChains())
m.syncDiscoveryChains(state, pending, evt.DiscoChains)
}
err := pending.Add(

View File

@ -472,15 +472,40 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
}
mysqlSidecar := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "mysql-sidecar-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "mysql",
},
}
backend.ensureNode(t, mysql.Node)
backend.ensureService(t, "foo", mysql.Service)
backend.ensureService(t, "foo", &mysqlSidecar)
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
Service: &structs.NodeService{
ID: "mongo-1",
Service: "mongo",
Port: 5000,
},
}
mongoSidecar := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "mongo-sidecar-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "mongo",
},
}
backend.ensureNode(t, mongo.Node)
backend.ensureService(t, "zip", mongo.Service)
backend.ensureService(t, "zip", &mongoSidecar)
backend.ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "chain",
})
var (
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()

View File

@ -49,6 +49,14 @@ func TestServerExportedPeeredServices(t *testing.T) {
},
}))
// Create resolvers for each of the services so that they are guaranteed to be replicated by the peer stream.
for _, s := range []string{"web", "api", "db"} {
require.NoError(t, store.EnsureConfigEntry(0, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: s,
}))
}
authz := policyAuthorizer(t, `
service "web" { policy = "read" }
service "api" { policy = "read" }

View File

@ -62,8 +62,7 @@ func (i ExportedDiscoveryChainInfo) Equal(o ExportedDiscoveryChainInfo) bool {
return true
}
// ListAllDiscoveryChains returns all discovery chains (union of Services and
// DiscoChains).
// ListAllDiscoveryChains returns all discovery chains (union of Services and DiscoChains).
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo {
chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo)
if list == nil {

View File

@ -1235,6 +1235,12 @@ const (
// This service allows external traffic to exit the mesh through a terminating gateway
// based on centralized configuration.
ServiceKindDestination ServiceKind = "destination"
// ServiceKindConnectEnabled is used to indicate whether a service is either
// connect-native or if the service has a corresponding sidecar. It is used for
// internal query purposes and should not be exposed to users as a valid Kind
// option.
ServiceKindConnectEnabled ServiceKind = "connect-enabled"
)
// Type to hold a address and port of a service