From a8df87f574d9b33064b6af04a1336f47c7529dba Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 22 Jul 2022 11:52:05 +0100 Subject: [PATCH] proxycfg-glue: server-local implementation of `ExportedPeeredServices` This is the OSS portion of enterprise PR 2377. Adds a server-local implementation of the proxycfg.ExportedPeeredServices interface that sources data from a blocking query against the server's state store. --- agent/agent.go | 1 + .../proxycfg-glue/exported_peered_services.go | 60 ++++++++++ .../exported_peered_services_test.go | 112 ++++++++++++++++++ agent/proxycfg-glue/glue.go | 7 +- agent/proxycfg-glue/helpers_test.go | 8 ++ 5 files changed, 182 insertions(+), 6 deletions(-) create mode 100644 agent/proxycfg-glue/exported_peered_services.go create mode 100644 agent/proxycfg-glue/exported_peered_services_test.go diff --git a/agent/agent.go b/agent/agent.go index 751e62d4bc..a7c89a7270 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4246,6 +4246,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps) sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps) sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache)) + sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps) sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps) sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps) sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) diff --git a/agent/proxycfg-glue/exported_peered_services.go b/agent/proxycfg-glue/exported_peered_services.go new file mode 100644 index 0000000000..3ce8db6322 --- /dev/null +++ b/agent/proxycfg-glue/exported_peered_services.go @@ -0,0 +1,60 @@ +package proxycfgglue + +import ( + "context" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/consul/watch" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/structs/aclfilter" +) + +// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices +// interface by sourcing data from the agent cache. +func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices { + return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName} +} + +// ServerExportedPeeredServices satisifies the proxycfg.ExportedPeeredServices +// interface by sourcing data from a blocking query against the server's state +// store. +func ServerExportedPeeredServices(deps ServerDataSourceDeps) proxycfg.ExportedPeeredServices { + return &serverExportedPeeredServices{deps} +} + +type serverExportedPeeredServices struct { + deps ServerDataSourceDeps +} + +func (s *serverExportedPeeredServices) Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore, + func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedExportedServiceList, error) { + // TODO(peering): acls: mesh gateway needs appropriate wildcard service:read + authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil) + if err != nil { + return 0, nil, err + } + + index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + + result := &structs.IndexedExportedServiceList{ + Services: serviceMap, + QueryMeta: structs.QueryMeta{ + Backend: structs.QueryBackendBlocking, + Index: index, + }, + } + aclfilter.New(authz, s.deps.Logger).Filter(result) + + return index, result, nil + }, + dispatchBlockingQueryUpdate[*structs.IndexedExportedServiceList](ch), + ) +} diff --git a/agent/proxycfg-glue/exported_peered_services_test.go b/agent/proxycfg-glue/exported_peered_services_test.go new file mode 100644 index 0000000000..f0b41d9f30 --- /dev/null +++ b/agent/proxycfg-glue/exported_peered_services_test.go @@ -0,0 +1,112 @@ +package proxycfgglue + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" +) + +func TestServerExportedPeeredServices(t *testing.T) { + nextIndex := indexGenerator() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := state.NewStateStore(nil) + + for _, peer := range []string{"peer-1", "peer-2", "peer-3"} { + require.NoError(t, store.PeeringWrite(nextIndex(), &pbpeering.Peering{ + ID: testUUID(t), + Name: peer, + State: pbpeering.PeeringState_ACTIVE, + })) + } + + require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "web", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-1"}, + }, + }, + { + Name: "db", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-2"}, + }, + }, + }, + })) + + authz := policyAuthorizer(t, ` + service "web" { policy = "read" } + service "api" { policy = "read" } + service "db" { policy = "deny" } + `) + + eventCh := make(chan proxycfg.UpdateEvent) + dataSource := ServerExportedPeeredServices(ServerDataSourceDeps{ + GetStore: func() Store { return store }, + ACLResolver: newStaticResolver(authz), + }) + require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{}, "", eventCh)) + + t.Run("initial state", func(t *testing.T) { + result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh) + require.Equal(t, + map[string]structs.ServiceList{ + "peer-1": {structs.NewServiceName("web", nil)}, + }, + result.Services, + ) + }) + + t.Run("update exported services", func(t *testing.T) { + require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "web", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-1"}, + }, + }, + { + Name: "db", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-2"}, + }, + }, + { + Name: "api", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-1"}, + {PeerName: "peer-3"}, + }, + }, + }, + })) + + result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh) + require.Equal(t, + map[string]structs.ServiceList{ + "peer-1": { + structs.NewServiceName("api", nil), + structs.NewServiceName("web", nil), + }, + "peer-3": { + structs.NewServiceName("api", nil), + }, + }, + result.Services, + ) + }) +} diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 1e254f4065..04451c3d20 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -21,6 +21,7 @@ import ( type Store interface { watch.StateStore + ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error) IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error) @@ -103,12 +104,6 @@ func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig { return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName} } -// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices -// interface by sourcing data from the agent cache. -func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices { - return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName} -} - // cacheProxyDataSource implements a generic wrapper around the agent cache to // provide data to the proxycfg.Manager. type cacheProxyDataSource[ReqType cache.Request] struct { diff --git a/agent/proxycfg-glue/helpers_test.go b/agent/proxycfg-glue/helpers_test.go index 7e3b9078e7..7a0c67df11 100644 --- a/agent/proxycfg-glue/helpers_test.go +++ b/agent/proxycfg-glue/helpers_test.go @@ -9,6 +9,14 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" ) +func indexGenerator() func() uint64 { + var idx uint64 + return func() uint64 { + idx++ + return idx + } +} + func getEventResult[ResultType any](t *testing.T, eventCh <-chan proxycfg.UpdateEvent) ResultType { t.Helper()