From 3655802fdc887239b8cc3e7ee79657b7fb13992c Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Thu, 21 Jul 2022 13:38:28 +0100 Subject: [PATCH] proxycfg-glue: server-local implementation of `PeeredUpstreams` This is the OSS portion of enterprise PR 2352. It adds a server-local implementation of the proxycfg.PeeredUpstreams interface based on a blocking query against the server's state store. It also fixes an omission in the Virtual IP freeing logic where we were never updating the max index (and therefore blocking queries against VirtualIPsForAllImportedServices would not return on service deletion). --- agent/agent.go | 1 + agent/consul/state/catalog.go | 9 +- agent/proxycfg-glue/glue.go | 7 +- agent/proxycfg-glue/peered_upstreams.go | 55 ++++++++++++ agent/proxycfg-glue/peered_upstreams_test.go | 88 ++++++++++++++++++++ 5 files changed, 152 insertions(+), 8 deletions(-) create mode 100644 agent/proxycfg-glue/peered_upstreams.go create mode 100644 agent/proxycfg-glue/peered_upstreams_test.go diff --git a/agent/agent.go b/agent/agent.go index 4b78b69a96..751e62d4bc 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4251,6 +4251,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps) + sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps) sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache)) sources.TrustBundle = proxycfgglue.ServerTrustBundle(deps) sources.TrustBundleList = proxycfgglue.ServerTrustBundleList(deps) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 622cccd358..849d0820c3 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1990,7 +1990,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st } } psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: name} - if err := freeServiceVirtualIP(tx, psn, nil); err != nil { + if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil { return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err) } if err := cleanupKindServiceName(tx, idx, svc.CompoundServiceName(), svc.ServiceKind); err != nil { @@ -2008,6 +2008,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st // is removed. func freeServiceVirtualIP( tx WriteTxn, + idx uint64, psn structs.PeeredServiceName, excludeGateway *structs.ServiceName, ) error { @@ -2059,6 +2060,10 @@ func freeServiceVirtualIP( return fmt.Errorf("failed updating freed virtual IP table: %v", err) } + if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil { + return err + } + return nil } @@ -3497,7 +3502,7 @@ func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.T } if len(nodes) == 0 { psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: sn} - if err := freeServiceVirtualIP(tx, psn, &gatewayName); err != nil { + if err := freeServiceVirtualIP(tx, idx, psn, &gatewayName); err != nil { return err } } diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 7c3311f363..1e254f4065 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -28,6 +28,7 @@ type Store interface { PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) + VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error) } // CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from @@ -90,12 +91,6 @@ func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate { return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName} } -// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface -// by sourcing data from the agent cache. -func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams { - return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName} -} - // CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by // sourcing data from the agent cache. func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery { diff --git a/agent/proxycfg-glue/peered_upstreams.go b/agent/proxycfg-glue/peered_upstreams.go new file mode 100644 index 0000000000..4d3e85f81d --- /dev/null +++ b/agent/proxycfg-glue/peered_upstreams.go @@ -0,0 +1,55 @@ +package proxycfgglue + +import ( + "context" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/consul/watch" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" +) + +// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface +// by sourcing data from the agent cache. +func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams { + return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName} +} + +// ServerPeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface by +// sourcing data from a blocking query against the server's state store. +func ServerPeeredUpstreams(deps ServerDataSourceDeps) proxycfg.PeeredUpstreams { + return &serverPeeredUpstreams{deps} +} + +type serverPeeredUpstreams struct { + deps ServerDataSourceDeps +} + +func (s *serverPeeredUpstreams) Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + // TODO(peering): ACL filtering. + return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore, + func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedPeeredServiceList, error) { + index, vips, err := store.VirtualIPsForAllImportedServices(ws, req.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + + result := make([]structs.PeeredServiceName, 0, len(vips)) + for _, vip := range vips { + result = append(result, vip.Service) + } + + return index, &structs.IndexedPeeredServiceList{ + Services: result, + QueryMeta: structs.QueryMeta{ + Index: index, + Backend: structs.QueryBackendBlocking, + }, + }, nil + }, + dispatchBlockingQueryUpdate[*structs.IndexedPeeredServiceList](ch), + ) +} diff --git a/agent/proxycfg-glue/peered_upstreams_test.go b/agent/proxycfg-glue/peered_upstreams_test.go new file mode 100644 index 0000000000..c2faa44da8 --- /dev/null +++ b/agent/proxycfg-glue/peered_upstreams_test.go @@ -0,0 +1,88 @@ +package proxycfgglue + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" +) + +func TestServerPeeredUpstreams(t *testing.T) { + const ( + index uint64 = 123 + nodeName = "node-1" + ) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := state.NewStateStore(nil) + enableVirtualIPs(t, store) + + registerService := func(t *testing.T, index uint64, peerName, serviceName string) { + require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{Service: serviceName, ID: serviceName}, + PeerName: peerName, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + })) + + require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{ + Service: fmt.Sprintf("%s-proxy", serviceName), + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: serviceName, + }, + }, + PeerName: peerName, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + })) + } + + registerService(t, index, "peer-1", "web") + + eventCh := make(chan proxycfg.UpdateEvent) + dataSource := ServerPeeredUpstreams(ServerDataSourceDeps{ + GetStore: func() Store { return store }, + }) + require.NoError(t, dataSource.Notify(ctx, &structs.PartitionSpecificRequest{EnterpriseMeta: *acl.DefaultEnterpriseMeta()}, "", eventCh)) + + testutil.RunStep(t, "initial state", func(t *testing.T) { + result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh) + require.Len(t, result.Services, 1) + require.Equal(t, "peer-1", result.Services[0].Peer) + require.Equal(t, "web", result.Services[0].ServiceName.Name) + }) + + testutil.RunStep(t, "register another service", func(t *testing.T) { + registerService(t, index+1, "peer-2", "db") + + result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh) + require.Len(t, result.Services, 2) + }) + + testutil.RunStep(t, "deregister service", func(t *testing.T) { + require.NoError(t, store.DeleteService(index+2, nodeName, "web", acl.DefaultEnterpriseMeta(), "peer-1")) + + result := getEventResult[*structs.IndexedPeeredServiceList](t, eventCh) + require.Len(t, result.Services, 1) + }) +} + +func enableVirtualIPs(t *testing.T, store *state.Store) { + t.Helper() + + require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{ + Key: structs.SystemMetadataVirtualIPsEnabled, + Value: "true", + })) +}