From 499211f9078e53640f1335fdc0c98374d8206ced Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 28 Jul 2022 12:51:01 -0700 Subject: [PATCH 1/3] Fix wildcard picking up services it shouldn't for ingress/terminating gateways --- agent/consul/state/catalog.go | 96 ++++++++++++++++++++++++-- agent/consul/state/catalog_test.go | 44 ++++++++---- agent/consul/state/config_entry.go | 4 +- agent/consul/state/state_store_test.go | 6 ++ 4 files changed, 128 insertions(+), 22 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 849d0820c3..a280ba9bf4 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -871,7 +871,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" { // Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence sn := structs.NewServiceName(svc.Service, &svc.EnterpriseMeta) - if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil { + if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, svc, structs.GatewayServiceKindService); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayServiceKindService); err != nil { @@ -1984,11 +1984,6 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta, svc.PeerName); err != nil { return err } - if svc.PeerName == "" { - if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { - return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) - } - } psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) @@ -2001,6 +1996,12 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) } + if svc.PeerName == "" { + if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { + return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) + } + } + return nil } @@ -3652,6 +3653,18 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } + supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, sn.ServiceName, entMeta) + if err != nil { + return err + } + + if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress { + continue + } + if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating { + continue + } + existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) @@ -3717,6 +3730,42 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer return nil } +// serviceConnectInstances returns whether the service has at least one connect instance, +// and at least one non-connect instance. +func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) { + hasConnectInstance := false + connectQuery := Query{ + Value: serviceName, + EnterpriseMeta: *entMeta, + } + svc, err := tx.First(tableServices, indexConnect, connectQuery) + if err != nil { + return false, false, fmt.Errorf("failed service lookup: %s", err) + } + if svc != nil { + hasConnectInstance = true + } + + hasNonConnectInstance := false + nonConnectQuery := Query{ + Value: serviceName, + EnterpriseMeta: *entMeta, + } + iter, err := tx.Get(tableServices, indexService, nonConnectQuery) + if err != nil { + return false, false, fmt.Errorf("failed service lookup: %s", err) + } + for service := iter.Next(); service != nil; service = iter.Next() { + sn := service.(*structs.ServiceNode) + if !sn.ServiceConnect.Native { + hasNonConnectInstance = true + break + } + } + + return hasConnectInstance, hasNonConnectInstance, nil +} + // updateGatewayService associates services with gateways after an eligible event // ie. Registering a service in a namespace targeted by a gateway func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { @@ -3754,14 +3803,31 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi // checkWildcardForGatewaysAndUpdate checks whether a service matches a // wildcard definition in gateway config entries and if so adds it the the // gateway-services table. -func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error { +func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ns *structs.NodeService, kind structs.GatewayServiceKind) error { sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta} svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) if err != nil { return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err) } + + supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, svc.Name, &svc.EnterpriseMeta) + if err != nil { + return err + } + if ns != nil && ns.Connect.Native { + supportsIngress = true + } else { + supportsTerminating = true + } + for service := svcGateways.Next(); service != nil; service = svcGateways.Next() { if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil { + if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress { + continue + } + if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating { + continue + } // Copy the wildcard mapping and modify it gatewaySvc := wildcardSvc.Clone() @@ -3818,12 +3884,28 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) } } + // Check whether there are any connect or non-connect instances remaining for this service. + // If there are no connect instances left, ingress gateways with a wildcard entry can remove + // their association with it (same with terminating gateways if there are no non-connect + // instances left). + hasConnectInstance, hasNonConnectInstance, err := serviceConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return err + } + // Do the updates in a separate loop so we don't trash the iterator. for _, m := range mappings { // Only delete if association was created by a wildcard specifier. // Otherwise the service was specified in the config entry, and the association should be maintained // for when the service is re-registered if m.FromWildcard { + if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance { + continue + } + if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance { + continue + } + if err := tx.Delete(tableGatewayServices, m); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index fed3bd0ee3..357844dc01 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -4,13 +4,14 @@ import ( "context" crand "crypto/rand" "fmt" - "github.com/hashicorp/consul/acl" "reflect" "sort" "strings" "testing" "time" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/assert" @@ -5753,6 +5754,10 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) assert.Nil(t, s.EnsureService(14, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + // Connect services (should be ignored by terminating gateway) + assert.Nil(t, s.EnsureService(15, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000})) + assert.Nil(t, s.EnsureService(16, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Connect: structs.ServiceConnect{Native: true}, Port: 5000})) + // Register two gateways assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443})) @@ -5895,6 +5900,16 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { }, } assert.Equal(t, expect, out) + + // Delete the non-connect instance of api + assert.Nil(t, s.DeleteService(21, "foo", "api", nil, "")) + + // Gateway with wildcard entry should have no services left, because the last + // non-connect instance of 'api' was deleted. + idx, out, err = s.GatewayServices(ws, "other-gateway", nil) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(21)) + assert.Empty(t, out) } func TestStateStore_CheckIngressServiceNodes(t *testing.T) { @@ -5904,7 +5919,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { t.Run("check service1 ingress gateway", func(t *testing.T) { idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) // Multiple instances of the ingress2 service require.Len(t, results, 4) @@ -5923,7 +5938,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { t.Run("check service2 ingress gateway", func(t *testing.T) { idx, results, err := s.CheckIngressServiceNodes(ws, "service2", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 2) ids := make(map[string]struct{}) @@ -5941,7 +5956,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { ws := memdb.NewWatchSet() idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 1) require.Equal(t, "wildcardIngress", results[0].Service.ID) }) @@ -5952,17 +5967,17 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 3) idx, results, err = s.CheckIngressServiceNodes(ws, "service2", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) require.Len(t, results, 1) idx, results, err = s.CheckIngressServiceNodes(ws, "service3", nil) require.NoError(t, err) - require.Equal(t, uint64(15), idx) + require.Equal(t, uint64(18), idx) // TODO(ingress): index goes backward when deleting last config entry // require.Equal(t,uint64(11), idx) require.Len(t, results, 0) @@ -6346,8 +6361,8 @@ func TestStateStore_GatewayServices_IngressProtocolFiltering(t *testing.T) { } testRegisterNode(t, s, 0, "node1") - testRegisterService(t, s, 1, "node1", "service1") - testRegisterService(t, s, 2, "node1", "service2") + testRegisterConnectService(t, s, 1, "node1", "service1") + testRegisterConnectService(t, s, 2, "node1", "service2") assert.NoError(t, s.EnsureConfigEntry(4, ingress1)) }) @@ -6510,15 +6525,17 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 1, "node2") - // Register a service against the nodes. + // Register some connect and non-connect services against the nodes. testRegisterIngressService(t, s, 3, "node1", "wildcardIngress") testRegisterIngressService(t, s, 4, "node1", "ingress1") testRegisterIngressService(t, s, 5, "node1", "ingress2") testRegisterIngressService(t, s, 6, "node2", "ingress2") testRegisterIngressService(t, s, 7, "node1", "nothingIngress") - testRegisterService(t, s, 8, "node1", "service1") - testRegisterService(t, s, 9, "node2", "service2") - testRegisterService(t, s, 10, "node2", "service3") + testRegisterConnectService(t, s, 8, "node1", "service1") + testRegisterConnectService(t, s, 9, "node2", "service2") + testRegisterConnectService(t, s, 10, "node2", "service3") + testRegisterService(t, s, 17, "node1", "service4") + testRegisterService(t, s, 18, "node2", "service5") // Default protocol to http proxyDefaults := &structs.ProxyConfigEntry{ @@ -7883,6 +7900,7 @@ func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) { Address: "127.0.0.3", Port: 443, EnterpriseMeta: *defaultMeta, + Connect: structs.ServiceConnect{Native: true}, } require.NoError(t, s.EnsureService(5, "foo", &svc)) assert.True(t, watchFired(ws)) diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index af0fe21133..f5b15115c3 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -371,7 +371,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a gsKind = structs.GatewayServiceKindUnknown } serviceName := structs.NewServiceName(c.GetName(), c.GetEnterpriseMeta()) - if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, gsKind); err != nil { + if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil { @@ -434,7 +434,7 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry) if err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } - if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, gsKind); err != nil { + if err := checkGatewayWildcardsAndUpdate(tx, idx, &sn, nil, gsKind); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } if err := checkGatewayAndUpdate(tx, idx, &sn, gsKind); err != nil { diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 4946c50f21..c8460ca821 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -193,6 +193,12 @@ func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID s testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) } +func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { + testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, true, func(service *structs.NodeService) { + service.Connect = structs.ServiceConnect{Native: true} + }) +} + func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { svc := &structs.NodeService{ ID: serviceID, From 6580566c3b77ee710d5c5b213f6376ae9e62acbb Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 2 Aug 2022 09:41:19 -0700 Subject: [PATCH 2/3] Update ingress/terminating wildcard logic and handle destinations --- .changelog/13958.txt | 4 ++ agent/consul/state/catalog.go | 51 ++++++++++--------- agent/consul/state/catalog_test.go | 79 +++++++++++++++++++++++++++--- 3 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 .changelog/13958.txt diff --git a/.changelog/13958.txt b/.changelog/13958.txt new file mode 100644 index 0000000000..a64a0026b4 --- /dev/null +++ b/.changelog/13958.txt @@ -0,0 +1,4 @@ +```release-note:bug +connect: Ingress gateways with a wildcard service entry should no longer pick up non-connect services as upstreams. +connect: Terminating gateways with a wildcard service entry should no longer pick up connect services as upstreams. +``` diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index a280ba9bf4..578d97e5f2 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -890,6 +890,15 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool return fmt.Errorf("failed updating upstream/downstream association") } + service := svc.Service + if svc.Kind == structs.ServiceKindConnectProxy { + service = svc.Proxy.DestinationServiceName + } + sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta} + if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, svc, structs.GatewayServiceKindService); err != nil { + return fmt.Errorf("failed updating gateway mapping: %s", err) + } + supported, err := virtualIPsSupported(tx, nil) if err != nil { return err @@ -897,12 +906,6 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool // Update the virtual IP for the service if supported { - service := svc.Service - if svc.Kind == structs.ServiceKindConnectProxy { - service = svc.Proxy.DestinationServiceName - } - - sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta} psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn} vip, err := assignServiceVirtualIP(tx, idx, psn) if err != nil { @@ -3653,7 +3656,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } - supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, sn.ServiceName, entMeta) + supportsIngress, supportsTerminating, err := serviceHasConnectInstances(tx, sn.ServiceName, entMeta) if err != nil { return err } @@ -3730,15 +3733,15 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer return nil } -// serviceConnectInstances returns whether the service has at least one connect instance, +// serviceHasConnectInstances returns whether the service has at least one connect instance, // and at least one non-connect instance. -func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) { +func serviceHasConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, bool, error) { hasConnectInstance := false - connectQuery := Query{ + query := Query{ Value: serviceName, EnterpriseMeta: *entMeta, } - svc, err := tx.First(tableServices, indexConnect, connectQuery) + svc, err := tx.First(tableServices, indexConnect, query) if err != nil { return false, false, fmt.Errorf("failed service lookup: %s", err) } @@ -3747,11 +3750,7 @@ func serviceConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.Enter } hasNonConnectInstance := false - nonConnectQuery := Query{ - Value: serviceName, - EnterpriseMeta: *entMeta, - } - iter, err := tx.Get(tableServices, indexService, nonConnectQuery) + iter, err := tx.Get(tableServices, indexService, query) if err != nil { return false, false, fmt.Errorf("failed service lookup: %s", err) } @@ -3810,22 +3809,26 @@ func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.Servic return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err) } - supportsIngress, supportsTerminating, err := serviceConnectInstances(tx, svc.Name, &svc.EnterpriseMeta) + hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, svc.Name, &svc.EnterpriseMeta) if err != nil { return err } - if ns != nil && ns.Connect.Native { - supportsIngress = true - } else { - supportsTerminating = true + // If we were passed a NodeService, this might be the first registered instance of the service + // so we need to count it as either a connect or non-connect instance. + if ns != nil { + if ns.Connect.Native || ns.Kind == structs.ServiceKindConnectProxy { + hasConnectInstance = true + } else { + hasNonConnectInstance = true + } } for service := svcGateways.Next(); service != nil; service = svcGateways.Next() { if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil { - if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress { + if wildcardSvc.GatewayKind == structs.ServiceKindIngressGateway && !hasConnectInstance { continue } - if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating { + if wildcardSvc.GatewayKind == structs.ServiceKindTerminatingGateway && !hasNonConnectInstance && kind != structs.GatewayServiceKindDestination { continue } @@ -3888,7 +3891,7 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) // If there are no connect instances left, ingress gateways with a wildcard entry can remove // their association with it (same with terminating gateways if there are no non-connect // instances left). - hasConnectInstance, hasNonConnectInstance, err := serviceConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta) + hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta) if err != nil { return err } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 357844dc01..501a7e0d38 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6320,23 +6320,80 @@ func TestStateStore_GatewayServices_WildcardAssociation(t *testing.T) { }) t.Run("do not associate connect-proxy services with gateway", func(t *testing.T) { + // Should only associate web (the destination service of the proxy), not the + // sidecar service name itself. testRegisterSidecarProxy(t, s, 19, "node1", "web") - require.False(t, watchFired(ws)) + expected := structs.GatewayServices{ + { + Gateway: structs.NewServiceName("wildcardIngress", nil), + Service: structs.NewServiceName("service1", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Port: 4444, + Protocol: "http", + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 12, + ModifyIndex: 12, + }, + }, + { + Gateway: structs.NewServiceName("wildcardIngress", nil), + Service: structs.NewServiceName("service2", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Port: 4444, + Protocol: "http", + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 12, + ModifyIndex: 12, + }, + }, + { + Gateway: structs.NewServiceName("wildcardIngress", nil), + Service: structs.NewServiceName("service3", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Port: 4444, + Protocol: "http", + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 12, + ModifyIndex: 12, + }, + }, + { + Gateway: structs.NewServiceName("wildcardIngress", nil), + Service: structs.NewServiceName("web", nil), + ServiceKind: structs.GatewayServiceKindService, + GatewayKind: structs.ServiceKindIngressGateway, + Port: 4444, + Protocol: "http", + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 19, + ModifyIndex: 19, + }, + }, + } + idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil) require.NoError(t, err) - require.Equal(t, uint64(16), idx) - require.Len(t, results, 3) + require.Equal(t, uint64(19), idx) + require.ElementsMatch(t, results, expected) }) t.Run("do not associate consul services with gateway", func(t *testing.T) { + ws := memdb.NewWatchSet() + _, _, err := s.GatewayServices(ws, "wildcardIngress", nil) + require.NoError(t, err) + require.Nil(t, s.EnsureService(20, "node1", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil}, )) require.False(t, watchFired(ws)) idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil) require.NoError(t, err) - require.Equal(t, uint64(16), idx) - require.Len(t, results, 3) + require.Equal(t, uint64(19), idx) + require.Len(t, results, 4) }) } @@ -6525,7 +6582,7 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 1, "node2") - // Register some connect and non-connect services against the nodes. + // Register some connect services against the nodes. testRegisterIngressService(t, s, 3, "node1", "wildcardIngress") testRegisterIngressService(t, s, 4, "node1", "ingress1") testRegisterIngressService(t, s, 5, "node1", "ingress2") @@ -6533,7 +6590,15 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { testRegisterIngressService(t, s, 7, "node1", "nothingIngress") testRegisterConnectService(t, s, 8, "node1", "service1") testRegisterConnectService(t, s, 9, "node2", "service2") - testRegisterConnectService(t, s, 10, "node2", "service3") + testRegisterService(t, s, 10, "node2", "service3") + testRegisterServiceWithChangeOpts(t, s, 11, "node2", "service3-proxy", false, func(service *structs.NodeService) { + service.Kind = structs.ServiceKindConnectProxy + service.Proxy = structs.ConnectProxyConfig{ + DestinationServiceName: "service3", + } + }) + + // Register some non-connect services - these shouldn't be picked up by a wildcard. testRegisterService(t, s, 17, "node1", "service4") testRegisterService(t, s, 18, "node2", "service5") From fe1fcea34fabbc5039a9eb063eef662a919635a6 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 8 Aug 2022 10:44:18 -0700 Subject: [PATCH 3/3] Add some extra handling for destination deletes --- agent/consul/state/catalog.go | 37 +++-- agent/consul/state/catalog_test.go | 61 ++++++- agent/consul/state/config_entry.go | 3 + agent/consul/state/config_entry_test.go | 204 +++++++++++++++--------- 4 files changed, 221 insertions(+), 84 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 578d97e5f2..258519d5ba 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -11,6 +11,7 @@ import ( "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -2000,7 +2001,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st } if svc.PeerName == "" { - if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { + sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta} + if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil { return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) } } @@ -3656,15 +3658,15 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } - supportsIngress, supportsTerminating, err := serviceHasConnectInstances(tx, sn.ServiceName, entMeta) + hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, sn.ServiceName, entMeta) if err != nil { return err } - if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress { + if service.GatewayKind == structs.ServiceKindIngressGateway && !hasConnectInstance { continue } - if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating { + if service.GatewayKind == structs.ServiceKindTerminatingGateway && !hasNonConnectInstance { continue } @@ -3872,12 +3874,11 @@ func checkGatewayAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ki return nil } -func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error { +func cleanupGatewayWildcards(tx WriteTxn, idx uint64, sn structs.ServiceName, cleaningUpDestination bool) error { // Clean up association between service name and gateways if needed - sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta} gateways, err := tx.Get(tableGatewayServices, indexService, sn) if err != nil { - return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err) + return fmt.Errorf("failed gateway lookup for %q: %s", sn.Name, err) } mappings := make([]*structs.GatewayService, 0) @@ -3891,11 +3892,27 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) // If there are no connect instances left, ingress gateways with a wildcard entry can remove // their association with it (same with terminating gateways if there are no non-connect // instances left). - hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta) + hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, sn.Name, &sn.EnterpriseMeta) if err != nil { return err } + // If we're deleting a service instance but this service is defined as a destination via config entry, + // keep the mapping around. + hasDestination := false + if !cleaningUpDestination { + q := configentry.NewKindName(structs.ServiceDefaults, sn.Name, &sn.EnterpriseMeta) + existing, err := tx.First(tableConfigEntries, indexID, q) + if err != nil { + return fmt.Errorf("failed config entry lookup: %s", err) + } + if existing != nil { + if entry, ok := existing.(*structs.ServiceConfigEntry); ok && entry.Destination != nil { + hasDestination = true + } + } + } + // Do the updates in a separate loop so we don't trash the iterator. for _, m := range mappings { // Only delete if association was created by a wildcard specifier. @@ -3905,7 +3922,7 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance { continue } - if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance { + if m.GatewayKind == structs.ServiceKindTerminatingGateway && (hasNonConnectInstance || hasDestination) { continue } @@ -3921,7 +3938,7 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) } else { kind, err := GatewayServiceKind(tx, m.Service.Name, &m.Service.EnterpriseMeta) if err != nil { - return fmt.Errorf("failed to get gateway service kind for service %s: %v", svc.ServiceName, err) + return fmt.Errorf("failed to get gateway service kind for service %s: %v", sn.Name, err) } checkGatewayAndUpdate(tx, idx, &structs.ServiceName{Name: m.Service.Name, EnterpriseMeta: m.Service.EnterpriseMeta}, kind) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 501a7e0d38..10e7af6dba 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -5338,13 +5338,70 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) { } assert.Equal(t, expect, out) + // Add a destination via config entry and make sure it's picked up by the wildcard. + configEntryDest := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "destination1", + Destination: &structs.DestinationConfig{Port: 9000, Addresses: []string{"kafka.test.com"}}, + } + assert.NoError(t, s.EnsureConfigEntry(27, configEntryDest)) + + idx, out, err = s.GatewayServices(ws, "gateway2", nil) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(27)) + assert.Len(t, out, 3) + + expectWildcardIncludesDest := structs.GatewayServices{ + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("gateway2", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 26, + ModifyIndex: 26, + }, + }, + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway2", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 26, + ModifyIndex: 26, + }, + }, + { + Service: structs.NewServiceName("destination1", nil), + Gateway: structs.NewServiceName("gateway2", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + ServiceKind: structs.GatewayServiceKindDestination, + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 27, + ModifyIndex: 27, + }, + }, + } + assert.ElementsMatch(t, expectWildcardIncludesDest, out) + + // Delete the destination. + assert.NoError(t, s.DeleteConfigEntry(28, structs.ServiceDefaults, "destination1", nil)) + + idx, out, err = s.GatewayServices(ws, "gateway2", nil) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(28)) + assert.Len(t, out, 2) + assert.Equal(t, expect, out) + // Deleting the config entry should remove existing mappings - assert.Nil(t, s.DeleteConfigEntry(27, "terminating-gateway", "gateway", nil)) + assert.Nil(t, s.DeleteConfigEntry(29, "terminating-gateway", "gateway", nil)) assert.True(t, watchFired(ws)) idx, out, err = s.GatewayServices(ws, "gateway", nil) assert.Nil(t, err) - assert.Equal(t, idx, uint64(27)) + assert.Equal(t, idx, uint64(29)) assert.Len(t, out, 0) } diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index f5b15115c3..97b7e3f281 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -374,6 +374,9 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } + if err := cleanupGatewayWildcards(tx, idx, serviceName, true); err != nil { + return fmt.Errorf("failed to cleanup gateway mapping: \"%s\"; err: %v", serviceName, err) + } if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 78140d021b..2731899331 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -372,7 +372,7 @@ func TestStore_ServiceDefaults_Kind_Destination(t *testing.T) { _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) require.NoError(t, err) require.Len(t, gatewayServices, 1) - require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindUnknown) + require.Equal(t, structs.GatewayServiceKindUnknown, gatewayServices[0].ServiceKind) _, kindServices, err = s.ServiceNamesOfKind(ws, structs.ServiceKindDestination) require.NoError(t, err) @@ -710,13 +710,141 @@ func TestStore_ServiceDefaults_Kind_Destination_Wildcard(t *testing.T) { require.NoError(t, s.DeleteConfigEntry(6, structs.ServiceDefaults, destination.Name, &destination.EnterpriseMeta)) - //Watch is fired because we transitioned to a destination, by default we assume it's not. + // Watch is fired because we deleted the destination - now the mapping should be gone. require.True(t, watchFired(ws)) _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) require.NoError(t, err) - require.Len(t, gatewayServices, 1) - require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindUnknown) + require.Len(t, gatewayServices, 0) + + t.Run("delete service instance before config entry", func(t *testing.T) { + // Set up a service with both a real instance and destination from a config entry. + require.NoError(t, s.EnsureNode(7, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + require.NoError(t, s.EnsureService(8, "foo", &structs.NodeService{ID: "dest2", Service: "dest2", Tags: nil, Address: "", Port: 5000})) + + ws = memdb.NewWatchSet() + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Len(t, gatewayServices, 1) + require.Equal(t, structs.GatewayServiceKindService, gatewayServices[0].ServiceKind) + + // Register destination; shouldn't change the gateway mapping. + destination2 := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "dest2", + Destination: &structs.DestinationConfig{}, + } + require.NoError(t, s.EnsureConfigEntry(9, destination2)) + require.False(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Len(t, gatewayServices, 1) + expected := structs.GatewayServices{ + { + Service: structs.NewServiceName("dest2", nil), + Gateway: structs.NewServiceName("Gtwy1", nil), + ServiceKind: structs.GatewayServiceKindService, + GatewayKind: structs.ServiceKindTerminatingGateway, + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + } + require.Equal(t, expected, gatewayServices) + + // Delete the service, mapping should still exist. + require.NoError(t, s.DeleteService(10, "foo", "dest2", nil, "")) + require.False(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Len(t, gatewayServices, 1) + require.Equal(t, expected, gatewayServices) + + // Delete the config entry, mapping should be gone. + require.NoError(t, s.DeleteConfigEntry(11, structs.ServiceDefaults, "dest2", &destination.EnterpriseMeta)) + require.True(t, watchFired(ws)) + + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Empty(t, gatewayServices) + }) + + t.Run("delete config entry before service instance", func(t *testing.T) { + // Set up a service with both a real instance and destination from a config entry. + destination2 := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "dest2", + Destination: &structs.DestinationConfig{}, + } + require.NoError(t, s.EnsureConfigEntry(7, destination2)) + + ws = memdb.NewWatchSet() + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Len(t, gatewayServices, 1) + expected := structs.GatewayServices{ + { + Service: structs.NewServiceName("dest2", nil), + Gateway: structs.NewServiceName("Gtwy1", nil), + ServiceKind: structs.GatewayServiceKindDestination, + GatewayKind: structs.ServiceKindTerminatingGateway, + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 7, + ModifyIndex: 7, + }, + }, + } + require.Equal(t, expected, gatewayServices) + + // Register service, only ServiceKind should have changed on the gateway mapping. + require.NoError(t, s.EnsureNode(8, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + require.NoError(t, s.EnsureService(9, "foo", &structs.NodeService{ID: "dest2", Service: "dest2", Tags: nil, Address: "", Port: 5000})) + require.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Len(t, gatewayServices, 1) + expected = structs.GatewayServices{ + { + Service: structs.NewServiceName("dest2", nil), + Gateway: structs.NewServiceName("Gtwy1", nil), + ServiceKind: structs.GatewayServiceKindService, + GatewayKind: structs.ServiceKindTerminatingGateway, + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 7, + ModifyIndex: 9, + }, + }, + } + require.Equal(t, expected, gatewayServices) + + // Delete the config entry, mapping should still exist. + require.NoError(t, s.DeleteConfigEntry(10, structs.ServiceDefaults, "dest2", &destination.EnterpriseMeta)) + require.False(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Len(t, gatewayServices, 1) + require.Equal(t, expected, gatewayServices) + + // Delete the service, mapping should be gone. + require.NoError(t, s.DeleteService(11, "foo", "dest2", nil, "")) + require.True(t, watchFired(ws)) + + _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) + require.NoError(t, err) + require.Empty(t, gatewayServices) + }) } func TestStore_Service_TerminatingGateway_Kind_Service_Wildcard(t *testing.T) { @@ -774,74 +902,6 @@ func TestStore_Service_TerminatingGateway_Kind_Service_Wildcard(t *testing.T) { require.Len(t, gatewayServices, 0) } -func TestStore_Service_TerminatingGateway_Kind_Service_Destination_Wildcard(t *testing.T) { - s := testConfigStateStore(t) - - Gtwy := &structs.TerminatingGatewayConfigEntry{ - Kind: structs.TerminatingGateway, - Name: "Gtwy1", - Services: []structs.LinkedService{ - { - Name: "*", - }, - }, - } - - // Create - require.NoError(t, s.EnsureConfigEntry(0, Gtwy)) - - service := &structs.NodeService{ - Kind: structs.ServiceKindTypical, - Service: "web", - } - destination := &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "web", - Destination: &structs.DestinationConfig{}, - } - - _, gatewayServices, err := s.GatewayServices(nil, "Gtwy1", nil) - require.NoError(t, err) - require.Len(t, gatewayServices, 0) - - ws := memdb.NewWatchSet() - _, _, err = s.GatewayServices(ws, "Gtwy1", nil) - require.NoError(t, err) - - // Create - require.NoError(t, s.EnsureConfigEntry(0, destination)) - - _, gatewayServices, err = s.GatewayServices(nil, "Gtwy1", nil) - require.NoError(t, err) - require.Len(t, gatewayServices, 1) - require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindDestination) - - require.NoError(t, s.EnsureNode(0, &structs.Node{Node: "node1"})) - require.NoError(t, s.EnsureService(0, "node1", service)) - - //Watch is fired because we transitioned to a destination, by default we assume it's not. - require.True(t, watchFired(ws)) - - _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) - require.NoError(t, err) - require.Len(t, gatewayServices, 1) - require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindService) - - ws = memdb.NewWatchSet() - _, _, err = s.GatewayServices(ws, "Gtwy1", nil) - require.NoError(t, err) - - require.NoError(t, s.DeleteService(6, "node1", service.ID, &service.EnterpriseMeta, "")) - - //Watch is fired because we transitioned to a destination, by default we assume it's not. - require.True(t, watchFired(ws)) - - _, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil) - require.NoError(t, err) - require.Len(t, gatewayServices, 0) - -} - func TestStore_ConfigEntry_GraphValidation(t *testing.T) { ensureConfigEntry := func(s *Store, idx uint64, entry structs.ConfigEntry) error { if err := entry.Normalize(); err != nil {