diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index 2f796c6914..79162603ac 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -31,6 +31,10 @@ var ( // assignment to be enabled. minVirtualIPVersion = version.Must(version.NewVersion("1.11.0")) + // minVirtualIPVersion is the minimum version for all Consul servers for virtual IP + // assignment to be enabled for terminating gateways. + minVirtualIPTerminatingGatewayVersion = version.Must(version.NewVersion("1.11.2")) + // virtualIPVersionCheckInterval is the frequency we check whether all servers meet // the minimum version to enable virtual IP assignment for services. virtualIPVersionCheckInterval = time.Minute @@ -125,7 +129,7 @@ func (s *Server) pruneCARoots() error { func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error { // Return early if the flag is already set. - done, err := s.setVirtualIPVersionFlag() + done, err := s.setVirtualIPFlags() if err != nil { s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err) } @@ -142,7 +146,7 @@ func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - done, err := s.setVirtualIPVersionFlag() + done, err := s.setVirtualIPFlags() if err != nil { s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err) continue @@ -154,6 +158,19 @@ func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error { } } +func (s *Server) setVirtualIPFlags() (bool, error) { + virtualIPFlag, err := s.setVirtualIPVersionFlag() + if err != nil { + return false, err + } + terminatingGatewayVirtualIPFlag, err := s.setVirtualIPTerminatingGatewayVersionFlag() + if err != nil { + return false, err + } + + return virtualIPFlag && terminatingGatewayVirtualIPFlag, nil +} + func (s *Server) setVirtualIPVersionFlag() (bool, error) { val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) if err != nil { @@ -175,6 +192,27 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) { return true, nil } +func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) { + val, err := s.getSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled) + if err != nil { + return false, err + } + if val != "" { + return true, nil + } + + if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPTerminatingGatewayVersion); !ok { + return false, fmt.Errorf("can't allocate Virtual IPs for terminating gateways until all servers >= %s", + minVirtualIPTerminatingGatewayVersion.String()) + } + + if err := s.setSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil { + return false, nil + } + + return true, nil +} + // retryLoopBackoff loops a given function indefinitely, backing off exponentially // upon errors up to a maximum of maxRetryBackoff seconds. func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) { diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 300e50652d..d7f6816228 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -2134,7 +2134,7 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { c.Bootstrap = false c.BootstrapExpect = 3 c.Datacenter = "dc1" - c.Build = "1.11.0" + c.Build = "1.11.2" } dir1, s1 := testServerWithConfig(t, conf) defer os.RemoveAll(dir1) @@ -2163,6 +2163,10 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { _, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled) require.NoError(t, err) require.Nil(t, entry) + state = s1.fsm.State() + _, entry, err = state.SystemMetadataGet(nil, structs.SystemMetadataTermGatewayVirtualIPsEnabled) + require.NoError(t, err) + require.Nil(t, entry) // Register a connect-native service and make sure we don't have a virtual IP yet. err = state.EnsureRegistration(10, &structs.RegisterRequest{ @@ -2181,6 +2185,35 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { require.NoError(t, err) require.Equal(t, "", vip) + // Register a terminating gateway. + err = state.EnsureRegistration(11, &structs.RegisterRequest{ + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + Service: "tgate1", + ID: "tgate1", + Kind: structs.ServiceKindTerminatingGateway, + }, + }) + require.NoError(t, err) + + err = state.EnsureConfigEntry(12, &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "bar", + }, + }, + }) + require.NoError(t, err) + + // Make sure the service referenced in the terminating gateway config doesn't have + // a virtual IP yet. + vip, err = state.VirtualIPForService(structs.NewServiceName("bar", nil)) + require.NoError(t, err) + require.Equal(t, "", vip) + // Leave s3 and wait for the version to get updated. require.NoError(t, s3.Leave()) retry.Run(t, func(r *retry.R) { @@ -2188,6 +2221,10 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { require.NoError(r, err) require.NotNil(r, entry) require.Equal(r, "true", entry.Value) + _, entry, err = state.SystemMetadataGet(nil, structs.SystemMetadataTermGatewayVirtualIPsEnabled) + require.NoError(r, err) + require.NotNil(r, entry) + require.Equal(r, "true", entry.Value) }) // Update the connect-native service - now there should be a virtual IP assigned. @@ -2206,6 +2243,34 @@ func TestLeader_EnableVirtualIPs(t *testing.T) { vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil)) require.NoError(t, err) require.Equal(t, "240.0.0.1", vip) + + // Update the terminating gateway config entry - now there should be a virtual IP assigned. + err = state.EnsureConfigEntry(21, &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "api", + }, + { + Name: "baz", + }, + }, + }) + require.NoError(t, err) + + _, node, err := state.NodeService("bar", "tgate1", nil) + require.NoError(t, err) + sn := structs.ServiceName{Name: "api"} + key := structs.ServiceGatewayVirtualIPTag(sn) + require.Contains(t, node.TaggedAddresses, key) + require.Equal(t, node.TaggedAddresses[key].Address, "240.0.0.1") + + // Make sure the baz service (only referenced in the config entry so far) + // has a virtual IP. + vip, err = state.VirtualIPForService(structs.NewServiceName("baz", nil)) + require.NoError(t, err) + require.Equal(t, "240.0.0.2", vip) } func TestLeader_ACL_Initialization_AnonymousToken(t *testing.T) { diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 8f4b07c2e4..90657d841e 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -787,6 +787,32 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool } } + // If there's a terminating gateway config entry for this service, populate the tagged addresses + // with virtual IP mappings. + termGatewayVIPsSupported, err := terminatingGatewayVirtualIPsSupported(tx, nil) + if err != nil { + return err + } + if termGatewayVIPsSupported && svc.Kind == structs.ServiceKindTerminatingGateway { + _, conf, err := configEntryTxn(tx, nil, structs.TerminatingGateway, svc.Service, &svc.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to retrieve terminating gateway config: %s", err) + } + if conf != nil { + termGatewayConf := conf.(*structs.TerminatingGatewayConfigEntry) + addrs, err := getTermGatewayVirtualIPs(tx, termGatewayConf.Services, &svc.EnterpriseMeta) + if err != nil { + return err + } + if svc.TaggedAddresses == nil { + svc.TaggedAddresses = make(map[string]structs.ServiceAddress) + } + for key, addr := range addrs { + svc.TaggedAddresses[key] = addr + } + } + } + // Create the service node entry and populate the indexes. Note that // conversion doesn't populate any of the node-specific information. // That's always populated when we read from the state store. @@ -939,6 +965,18 @@ func virtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) { return entry.Value != "", nil } +func terminatingGatewayVirtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) { + _, entry, err := systemMetadataGetTxn(tx, ws, structs.SystemMetadataTermGatewayVirtualIPsEnabled) + if err != nil { + return false, fmt.Errorf("failed system metadata lookup: %s", err) + } + if entry == nil { + return false, nil + } + + return entry.Value != "", nil +} + // Services returns all services along with a list of associated tags. func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Services, error) { tx := s.db.Txn(false) @@ -1697,7 +1735,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) } - if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil { + if err := freeServiceVirtualIP(tx, svc.ServiceName, nil, entMeta); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) } if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil { @@ -1713,7 +1751,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st // freeServiceVirtualIP is used to free a virtual IP for a service after the last instance // is removed. -func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMeta) error { +func freeServiceVirtualIP(tx WriteTxn, svc string, excludeGateway *structs.ServiceName, entMeta *structs.EnterpriseMeta) error { supported, err := virtualIPsSupported(tx, nil) if err != nil { return err @@ -1722,7 +1760,28 @@ func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMe return nil } + // Don't deregister the virtual IP if at least one terminating gateway still references this service. sn := structs.NewServiceName(svc, entMeta) + termGatewaySupported, err := terminatingGatewayVirtualIPsSupported(tx, nil) + if err != nil { + return err + } + if termGatewaySupported { + svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) + if err != nil { + return fmt.Errorf("failed gateway lookup for %q: %s", sn.Name, err) + } + + for service := svcGateways.Next(); service != nil; service = svcGateways.Next() { + if svc, ok := service.(*structs.GatewayService); ok && svc != nil { + ignoreGateway := excludeGateway == nil || !svc.Gateway.Matches(*excludeGateway) + if ignoreGateway && svc.GatewayKind == structs.ServiceKindTerminatingGateway { + return nil + } + } + } + } + serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn) if err != nil { return fmt.Errorf("failed service virtual IP lookup: %s", err) @@ -2862,6 +2921,18 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en return err } + // Update terminating gateway service virtual IPs + vipsSupported, err := terminatingGatewayVirtualIPsSupported(tx, nil) + if err != nil { + return err + } + if vipsSupported && conf.GetKind() == structs.TerminatingGateway { + gatewayConf := conf.(*structs.TerminatingGatewayConfigEntry) + if err := updateTerminatingGatewayVirtualIPs(tx, idx, gatewayConf, entMeta); err != nil { + return err + } + } + // Delete all associated with gateway first, to avoid keeping mappings that were removed sn := structs.NewServiceName(conf.GetName(), entMeta) @@ -2899,6 +2970,96 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en return nil } +func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, entMeta *structs.EnterpriseMeta) (map[string]structs.ServiceAddress, error) { + addrs := make(map[string]structs.ServiceAddress, len(services)) + for _, s := range services { + sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta} + vip, err := assignServiceVirtualIP(tx, sn) + if err != nil { + return nil, err + } + key := structs.ServiceGatewayVirtualIPTag(sn) + addrs[key] = structs.ServiceAddress{Address: vip} + } + + return addrs, nil +} + +func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.TerminatingGatewayConfigEntry, entMeta *structs.EnterpriseMeta) error { + // Build the current map of services with virtual IPs for this gateway + services := conf.Services + addrs, err := getTermGatewayVirtualIPs(tx, services, entMeta) + if err != nil { + return err + } + + // Find any deleted service entries by comparing the new config entry to the existing one. + _, existing, err := configEntryTxn(tx, nil, conf.GetKind(), conf.GetName(), entMeta) + if err != nil { + return fmt.Errorf("failed to get config entry: %v", err) + } + var deletes []structs.ServiceName + cfg, ok := existing.(*structs.TerminatingGatewayConfigEntry) + if ok { + for _, s := range cfg.Services { + sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta} + key := structs.ServiceGatewayVirtualIPTag(sn) + if _, ok := addrs[key]; !ok { + deletes = append(deletes, sn) + } + } + } + + q := Query{Value: conf.GetName(), EnterpriseMeta: *entMeta} + _, svcNodes, err := serviceNodesTxn(tx, nil, indexService, q) + if err != nil { + return err + } + + // Update the tagged addrs for any existing instances of this terminating gateway. + for _, s := range svcNodes { + newAddrs := make(map[string]structs.ServiceAddress) + for key, addr := range s.ServiceTaggedAddresses { + if !strings.HasPrefix(key, structs.TaggedAddressVirtualIP+":") { + newAddrs[key] = addr + } + } + for key, addr := range addrs { + newAddrs[key] = addr + } + + // Don't need to update the service record if it's a no-op. + if reflect.DeepEqual(newAddrs, s.ServiceTaggedAddresses) { + continue + } + + newSN := s.PartialClone() + newSN.ServiceTaggedAddresses = newAddrs + newSN.ModifyIndex = idx + if err := catalogInsertService(tx, newSN); err != nil { + return err + } + } + + // Check if we can delete any virtual IPs for the removed services. + gatewayName := structs.NewServiceName(conf.GetName(), entMeta) + for _, sn := range deletes { + // If there's no existing service nodes, attempt to free the virtual IP. + q := Query{Value: sn.Name, EnterpriseMeta: sn.EnterpriseMeta} + _, nodes, err := serviceNodesTxn(tx, nil, indexConnect, q) + if err != nil { + return err + } + if len(nodes) == 0 { + if err := freeServiceVirtualIP(tx, sn.Name, &gatewayName, &sn.EnterpriseMeta); err != nil { + return err + } + } + } + + return nil +} + // ingressConfigGatewayServices constructs a list of GatewayService structs for // insertion into the memdb table, specific to ingress gateways. The boolean // returned indicates that there are no changes necessary to the memdb table. diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index c0c2fecde4..7ee4e4dce4 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -315,8 +315,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event events = append(events, e) } - for gatewayName, serviceChanges := range termGatewayChanges { - for serviceName, gsChange := range serviceChanges { + for gatewayName, svcChanges := range termGatewayChanges { + for serviceName, gsChange := range svcChanges { gs := changeObject(gsChange.change).(*structs.GatewayService) q := Query{ @@ -355,6 +355,12 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // Build service events and append them for _, sn := range nodes { tuple := newNodeServiceTupleFromServiceNode(sn) + + // If we're already sending an event for the service, don't send another. + if _, ok := serviceChanges[tuple]; ok { + continue + } + e, err := newServiceHealthEventForService(tx, changes.Index, tuple) if err != nil { return nil, err diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 41d5a0961a..9becf5bcb3 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -73,10 +73,7 @@ func TestServiceHealthSnapshot(t *testing.T) { func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { store := NewStateStore(nil) - require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{ - Key: structs.SystemMetadataVirtualIPsEnabled, - Value: "true", - })) + setVirtualIPFlags(t, store) counter := newIndexCounter() err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db")) @@ -1101,28 +1098,34 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { WantEvents: []stream.Event{ testServiceHealthEvent(t, "tgate1", - evServiceTermingGateway("tgate1")), + evServiceTermingGateway("tgate1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2")), testServiceHealthEvent(t, "tgate1", evConnectTopic, - evServiceTermingGateway("srv1")), + evServiceTermingGateway("srv1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2")), testServiceHealthEvent(t, "tgate1", evConnectTopic, - evServiceTermingGateway("srv2")), + evServiceTermingGateway("srv2"), + evTerminatingGatewayVirtualIPs("srv1", "srv2")), testServiceHealthEvent(t, "tgate1", evServiceTermingGateway("tgate1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), evNode2), testServiceHealthEvent(t, "tgate1", evConnectTopic, evServiceTermingGateway("srv1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), evNode2), testServiceHealthEvent(t, "tgate1", evConnectTopic, evServiceTermingGateway("srv2"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), evNode2), }, }) @@ -1161,6 +1164,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { testServiceHealthEvent(t, "tgate1", evServiceTermingGateway("tgate1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), evNodeCheckFail, evNodeUnchanged, evNodeChecksMutated, @@ -1169,6 +1173,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { "tgate1", evConnectTopic, evServiceTermingGateway("srv1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), evNodeCheckFail, evNodeUnchanged, evNodeChecksMutated, @@ -1177,6 +1182,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { "tgate1", evConnectTopic, evServiceTermingGateway("srv2"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), evNodeCheckFail, evNodeUnchanged, evNodeChecksMutated, @@ -1208,16 +1214,26 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway(""), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), testServiceHealthEvent(t, "tgate1", evConnectTopic, evServiceTermingGateway("srv1"), - evServiceIndex(setupIndex)), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), testServiceHealthEvent(t, "tgate1", evConnectTopic, evServiceTermingGateway("srv2"), - evServiceIndex(setupIndex)), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), }, }) run(t, eventsTestCase{ @@ -1260,11 +1276,26 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway(""), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), testServiceHealthEvent(t, "tgate1", evConnectTopic, evServiceTermingGateway("srv2"), - evServiceIndex(setupIndex)), + evTerminatingGatewayVirtualIPs("srv1", "srv2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), }, }) run(t, eventsTestCase{ @@ -1307,10 +1338,25 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway(""), + evTerminatingGatewayVirtualIP("srv2", "240.0.0.2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), testServiceHealthDeregistrationEvent(t, "tgate1", evConnectTopic, - evServiceTermingGateway("srv1")), + evServiceTermingGateway("srv1"), + evTerminatingGatewayVirtualIP("srv2", "240.0.0.2"), + evServiceMutatedModifyIndex), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evTerminatingGatewayVirtualIP("srv2", "240.0.0.2"), + evServiceIndex(setupIndex), + evServiceMutatedModifyIndex), }, }) run(t, eventsTestCase{ @@ -1327,12 +1373,12 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) if err != nil { return err } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, Mutate: func(s *Store, tx *txn) error { configEntry := &structs.TerminatingGatewayConfigEntry{ @@ -1466,6 +1512,12 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { run(t, eventsTestCase{ Name: "rename a terminating gateway instance", Setup: func(s *Store, tx *txn) error { + err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + if err != nil { + return err + } + configEntry := &structs.TerminatingGatewayConfigEntry{ Kind: structs.TerminatingGateway, Name: "tgate1", @@ -1477,7 +1529,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), } - err := ensureConfigEntryTxn(tx, tx.Index, configEntry) + err = ensureConfigEntryTxn(tx, tx.Index, configEntry) if err != nil { return err } @@ -1492,12 +1544,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), } - err = ensureConfigEntryTxn(tx, tx.Index, configEntry) - if err != nil { - return err - } - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + return ensureConfigEntryTxn(tx, tx.Index, configEntry) }, Mutate: func(s *Store, tx *txn) error { rename := func(req *structs.RegisterRequest) error { @@ -1511,14 +1558,16 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { WantEvents: []stream.Event{ testServiceHealthDeregistrationEvent(t, "tgate1", - evServiceTermingGateway("tgate1")), + evServiceTermingGateway(""), + evTerminatingGatewayVirtualIPs("srv1")), testServiceHealthEvent(t, "tgate1", evServiceTermingGateway(""), evNodeUnchanged, evServiceMutated, evServiceChecksMutated, - evTerminatingGatewayRenamed("tgate2")), + evTerminatingGatewayRenamed("tgate2"), + evTerminatingGatewayVirtualIPs("srv1")), testServiceHealthDeregistrationEvent(t, "tgate1", evConnectTopic, @@ -1564,15 +1613,18 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { WantEvents: []stream.Event{ testServiceHealthDeregistrationEvent(t, "tgate1", - evServiceTermingGateway("")), + evServiceTermingGateway(""), + evTerminatingGatewayVirtualIPs("srv1", "srv2")), testServiceHealthDeregistrationEvent(t, "tgate1", evConnectTopic, - evServiceTermingGateway("srv1")), + evServiceTermingGateway("srv1"), + evTerminatingGatewayVirtualIPs("srv1", "srv2")), testServiceHealthDeregistrationEvent(t, "tgate1", evConnectTopic, - evServiceTermingGateway("srv2")), + evServiceTermingGateway("srv2"), + evTerminatingGatewayVirtualIPs("srv1", "srv2")), }, }) } @@ -1583,6 +1635,10 @@ func (tc eventsTestCase) run(t *testing.T) { Key: structs.SystemMetadataVirtualIPsEnabled, Value: "true", })) + require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{ + Key: structs.SystemMetadataTermGatewayVirtualIPsEnabled, + Value: "true", + })) setupIndex := uint64(10) mutateIndex := uint64(100) @@ -1636,6 +1692,14 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { csn.Service.Kind = structs.ServiceKindTerminatingGateway csn.Service.Port = 22000 + sn := structs.NewServiceName(name, &csn.Service.EnterpriseMeta) + key := structs.ServiceGatewayVirtualIPTag(sn) + if name != "" && name != csn.Service.Service { + csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{ + key: {Address: "240.0.0.1"}, + } + } + if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) payload.overrideKey = name @@ -1645,6 +1709,40 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { } } +func evTerminatingGatewayVirtualIP(name, addr string) func(e *stream.Event) error { + return func(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + + sn := structs.NewServiceName(name, &csn.Service.EnterpriseMeta) + key := structs.ServiceGatewayVirtualIPTag(sn) + csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{ + key: {Address: addr}, + } + + return nil + } +} + +func evTerminatingGatewayVirtualIPs(names ...string) func(e *stream.Event) error { + return func(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + + if len(names) > 0 { + csn.Service.TaggedAddresses = make(map[string]structs.ServiceAddress) + } + for i, name := range names { + sn := structs.NewServiceName(name, &csn.Service.EnterpriseMeta) + key := structs.ServiceGatewayVirtualIPTag(sn) + + csn.Service.TaggedAddresses[key] = structs.ServiceAddress{ + Address: fmt.Sprintf("240.0.0.%d", i+1), + } + } + + return nil + } +} + func evServiceIndex(idx uint64) func(e *stream.Event) error { return func(e *stream.Event) error { payload := e.Payload.(EventPayloadCheckServiceNode) @@ -2040,6 +2138,11 @@ func evServiceMutated(e *stream.Event) error { return nil } +func evServiceMutatedModifyIndex(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.ModifyIndex = 100 + return nil +} + // evServiceChecksMutated option alters the base event service check to set it's // CreateIndex (but not modify index) to the setup index. This expresses that we // expect the service check records originally created in setup to have been diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 1071fea23f..b3a41855ab 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1551,10 +1551,7 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) { func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { assert := assert.New(t) s := testStateStore(t) - require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{ - Key: structs.SystemMetadataVirtualIPsEnabled, - Value: "true", - })) + setVirtualIPFlags(t, s) // Create the service registration. entMeta := structs.DefaultEnterpriseMetaInDefaultPartition() @@ -1687,10 +1684,7 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { assert := assert.New(t) s := testStateStore(t) - require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{ - Key: structs.SystemMetadataVirtualIPsEnabled, - Value: "true", - })) + setVirtualIPFlags(t, s) // Create the service registration. entMeta := structs.DefaultEnterpriseMetaInDefaultPartition() @@ -7986,3 +7980,14 @@ func generateUUID() ([]byte, string) { buf[10:16]) return buf, uuid } + +func setVirtualIPFlags(t *testing.T, s *Store) { + require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{ + Key: structs.SystemMetadataVirtualIPsEnabled, + Value: "true", + })) + require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{ + Key: structs.SystemMetadataTermGatewayVirtualIPsEnabled, + Value: "true", + })) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index de6a177e95..cb84f77d99 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1999,6 +1999,10 @@ func (n ServiceName) ToServiceID() ServiceID { return ServiceID{ID: n.Name, EnterpriseMeta: n.EnterpriseMeta} } +func ServiceGatewayVirtualIPTag(sn ServiceName) string { + return fmt.Sprintf("%s:%s", TaggedAddressVirtualIP, sn.String()) +} + type ServiceList []ServiceName type IndexedServiceList struct { diff --git a/agent/structs/system_metadata.go b/agent/structs/system_metadata.go index dd90ba22cb..2c038872bf 100644 --- a/agent/structs/system_metadata.go +++ b/agent/structs/system_metadata.go @@ -25,10 +25,11 @@ type SystemMetadataRequest struct { } const ( - SystemMetadataIntentionFormatKey = "intention-format" - SystemMetadataIntentionFormatConfigValue = "config-entry" - SystemMetadataIntentionFormatLegacyValue = "legacy" - SystemMetadataVirtualIPsEnabled = "virtual-ips" + SystemMetadataIntentionFormatKey = "intention-format" + SystemMetadataIntentionFormatConfigValue = "config-entry" + SystemMetadataIntentionFormatLegacyValue = "legacy" + SystemMetadataVirtualIPsEnabled = "virtual-ips" + SystemMetadataTermGatewayVirtualIPsEnabled = "virtual-ips-term-gateway" ) type SystemMetadataEntry struct { diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 2dd9c2ca70..319161ff47 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -175,6 +175,15 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // We do not match on all endpoints here since it would lead to load balancing across // all instances when any instance address is dialed. for _, e := range endpoints { + if e.Service.Kind == structs.ServiceKind(structs.TerminatingGateway) { + key := structs.ServiceGatewayVirtualIPTag(chain.CompoundServiceName()) + + if vip := e.Service.TaggedAddresses[key]; vip.Address != "" { + uniqueAddrs[vip.Address] = struct{}{} + } + + continue + } if vip := e.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; vip.Address != "" { uniqueAddrs[vip.Address] = struct{}{} } diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index 7a3419a247..5c252fad38 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -1224,6 +1224,54 @@ func TestListenersFromSnapshot(t *testing.T) { } }, }, + { + name: "transparent-proxy-terminating-gateway", + create: proxycfg.TestConfigSnapshot, + setup: func(snap *proxycfg.ConfigSnapshot) { + snap.Proxy.Mode = structs.ProxyModeTransparent + + snap.ConnectProxy.MeshConfigSet = true + snap.ConnectProxy.MeshConfig = &structs.MeshConfigEntry{ + TransparentProxy: structs.TransparentProxyMeshConfig{ + MeshDestinationsOnly: true, + }, + } + + // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode + google := structs.NewServiceName("google", nil) + kafka := structs.NewServiceName("kafka", nil) + snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ + google.String(): {}, + kafka.String(): {}, + } + snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + + tgate := structs.CheckServiceNode{ + Node: &structs.Node{ + Address: "8.8.8.8", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + Service: "tgate1", + Kind: structs.ServiceKind(structs.TerminatingGateway), + Address: "9.9.9.9", + Port: 9090, + TaggedAddresses: map[string]structs.ServiceAddress{ + structs.ServiceGatewayVirtualIPTag(google): {Address: "10.0.0.1"}, + structs.ServiceGatewayVirtualIPTag(kafka): {Address: "10.0.0.2"}, + "virtual": {Address: "6.6.6.6"}, + }, + }, + } + snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + "google.default.default.dc1": {tgate}, + } + snap.ConnectProxy.WatchedUpstreamEndpoints[kafka.String()] = map[string]structs.CheckServiceNodes{ + "kafka.default.default.dc1": {tgate}, + } + }, + }, } latestEnvoyVersion := proxysupport.EnvoyVersions[0] diff --git a/agent/xds/testdata/listeners/transparent-proxy-terminating-gateway.envoy-1-20-x.golden b/agent/xds/testdata/listeners/transparent-proxy-terminating-gateway.envoy-1-20-x.golden new file mode 100644 index 0000000000..5267c85583 --- /dev/null +++ b/agent/xds/testdata/listeners/transparent-proxy-terminating-gateway.envoy-1-20-x.golden @@ -0,0 +1,177 @@ +{ + "versionInfo": "00000001", + "resources": [ + { + "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", + "name": "db:127.0.0.1:9191", + "address": { + "socketAddress": { + "address": "127.0.0.1", + "portValue": 9191 + } + }, + "filterChains": [ + { + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "upstream.db.default.default.dc1", + "cluster": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + } + ], + "trafficDirection": "OUTBOUND" + }, + { + "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", + "name": "outbound_listener:127.0.0.1:15001", + "address": { + "socketAddress": { + "address": "127.0.0.1", + "portValue": 15001 + } + }, + "filterChains": [ + { + "filterChainMatch": { + "prefixRanges": [ + { + "addressPrefix": "10.0.0.1", + "prefixLen": 32 + } + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "upstream.google.default.default.dc1", + "cluster": "google.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filterChainMatch": { + "prefixRanges": [ + { + "addressPrefix": "10.0.0.2", + "prefixLen": 32 + } + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "upstream.kafka.default.default.dc1", + "cluster": "kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + } + ], + "listenerFilters": [ + { + "name": "envoy.filters.listener.original_dst" + } + ], + "trafficDirection": "OUTBOUND" + }, + { + "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", + "name": "prepared_query:geo-cache:127.10.10.10:8181", + "address": { + "socketAddress": { + "address": "127.10.10.10", + "portValue": 8181 + } + }, + "filterChains": [ + { + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "upstream.prepared_query_geo-cache", + "cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + } + ], + "trafficDirection": "OUTBOUND" + }, + { + "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", + "name": "public_listener:0.0.0.0:9999", + "address": { + "socketAddress": { + "address": "0.0.0.0", + "portValue": 9999 + } + }, + "filterChains": [ + { + "filters": [ + { + "name": "envoy.filters.network.rbac", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.rbac.v3.RBAC", + "rules": { + + }, + "statPrefix": "connect_authz" + } + }, + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "public_listener", + "cluster": "local_app" + } + } + ], + "transportSocket": { + "name": "tls", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext", + "commonTlsContext": { + "tlsParams": { + + }, + "tlsCertificates": [ + { + "certificateChain": { + "inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n" + }, + "privateKey": { + "inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n" + } + } + ], + "validationContext": { + "trustedCa": { + "inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n" + } + } + }, + "requireClientCertificate": true + } + } + } + ], + "trafficDirection": "INBOUND" + } + ], + "typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener", + "nonce": "00000001" +} \ No newline at end of file