diff --git a/agent/consul/server.go b/agent/consul/server.go index 6c7ea98e06..0b8345947a 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -919,6 +919,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) { return s.getTrustDomain(caConfig) }, + LocalDatacenter: s.config.Datacenter, }) } diff --git a/internal/mesh/internal/cache/sidecarproxycache/cache.go b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache.go similarity index 85% rename from internal/mesh/internal/cache/sidecarproxycache/cache.go rename to internal/mesh/internal/cache/sidecarproxycache/destinations_cache.go index edebcbf029..e2b04e0aa4 100644 --- a/internal/mesh/internal/cache/sidecarproxycache/cache.go +++ b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache.go @@ -10,13 +10,13 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -// Cache stores information needed for the sidecar-proxy controller to reconcile efficiently. +// DestinationsCache stores information needed for the sidecar-proxy controller to reconcile efficiently. // This currently means storing a list of all destinations for easy look up // as well as indices of source proxies where those destinations are referenced. // // It is the responsibility of the controller and its subcomponents (like mapper and data fetcher) // to keep this cache up-to-date as we're observing new data. -type Cache struct { +type DestinationsCache struct { lock sync.RWMutex // store is a map from destination service reference and port as a reference key @@ -30,8 +30,8 @@ type Cache struct { type storeKeys map[ReferenceKeyWithPort]struct{} -func New() *Cache { - return &Cache{ +func NewDestinationsCache() *DestinationsCache { + return &DestinationsCache{ store: make(map[ReferenceKeyWithPort]intermediate.CombinedDestinationRef), sourceProxiesIndex: make(map[resource.ReferenceKey]storeKeys), } @@ -48,7 +48,7 @@ func KeyFromRefAndPort(ref *pbresource.Reference, port string) ReferenceKeyWithP } // WriteDestination adds destination reference to the cache. -func (c *Cache) WriteDestination(d intermediate.CombinedDestinationRef) { +func (c *DestinationsCache) WriteDestination(d intermediate.CombinedDestinationRef) { // Check that reference is a catalog.Service type. if !resource.EqualType(catalog.ServiceType, d.ServiceRef.Type) { panic("ref must of type catalog.Service") @@ -68,7 +68,7 @@ func (c *Cache) WriteDestination(d intermediate.CombinedDestinationRef) { } // DeleteDestination deletes a given destination reference and port from cache. -func (c *Cache) DeleteDestination(ref *pbresource.Reference, port string) { +func (c *DestinationsCache) DeleteDestination(ref *pbresource.Reference, port string) { // Check that reference is a catalog.Service type. if !resource.EqualType(catalog.ServiceType, ref.Type) { panic("ref must of type catalog.Service") @@ -80,7 +80,7 @@ func (c *Cache) DeleteDestination(ref *pbresource.Reference, port string) { c.deleteLocked(ref, port) } -func (c *Cache) addLocked(d intermediate.CombinedDestinationRef) { +func (c *DestinationsCache) addLocked(d intermediate.CombinedDestinationRef) { key := KeyFromRefAndPort(d.ServiceRef, d.Port) c.store[key] = d @@ -96,7 +96,7 @@ func (c *Cache) addLocked(d intermediate.CombinedDestinationRef) { } } -func (c *Cache) deleteLocked(ref *pbresource.Reference, port string) { +func (c *DestinationsCache) deleteLocked(ref *pbresource.Reference, port string) { key := KeyFromRefAndPort(ref, port) // First get it from the store. @@ -117,7 +117,7 @@ func (c *Cache) deleteLocked(ref *pbresource.Reference, port string) { } // DeleteSourceProxy deletes the source proxy given by id from the cache. -func (c *Cache) DeleteSourceProxy(id *pbresource.ID) { +func (c *DestinationsCache) DeleteSourceProxy(id *pbresource.ID) { // Check that id is the ProxyStateTemplate type. if !resource.EqualType(types.ProxyStateTemplateType, id.Type) { panic("id must of type mesh.ProxyStateTemplate") @@ -148,7 +148,7 @@ func (c *Cache) DeleteSourceProxy(id *pbresource.ID) { } // ReadDestination returns a destination reference for the given service reference and port. -func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) (intermediate.CombinedDestinationRef, bool) { +func (c *DestinationsCache) ReadDestination(ref *pbresource.Reference, port string) (intermediate.CombinedDestinationRef, bool) { // Check that reference is a catalog.Service type. if !resource.EqualType(catalog.ServiceType, ref.Type) { panic("ref must of type catalog.Service") @@ -164,7 +164,7 @@ func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) (interme } // DestinationsBySourceProxy returns all destinations that are a referenced by the given source proxy id. -func (c *Cache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef { +func (c *DestinationsCache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef { // Check that id is the ProxyStateTemplate type. if !resource.EqualType(types.ProxyStateTemplateType, id.Type) { panic("id must of type mesh.ProxyStateTemplate") diff --git a/internal/mesh/internal/cache/sidecarproxycache/cache_test.go b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache_test.go similarity index 83% rename from internal/mesh/internal/cache/sidecarproxycache/cache_test.go rename to internal/mesh/internal/cache/sidecarproxycache/destinations_cache_test.go index e2d634c98e..529a498b43 100644 --- a/internal/mesh/internal/cache/sidecarproxycache/cache_test.go +++ b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache_test.go @@ -3,19 +3,21 @@ package sidecarproxycache import ( "testing" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/resourcetest" "github.com/hashicorp/consul/proto-public/pbresource" - "github.com/stretchr/testify/require" ) func TestWrite_Create(t *testing.T) { - cache := New() + cache := NewDestinationsCache() - proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() destination := testDestination(proxyID) cache.WriteDestination(destination) @@ -34,9 +36,10 @@ func TestWrite_Create(t *testing.T) { } func TestWrite_Update(t *testing.T) { - cache := New() + cache := NewDestinationsCache() - proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() destination1 := testDestination(proxyID) cache.WriteDestination(destination1) @@ -58,7 +61,8 @@ func TestWrite_Update(t *testing.T) { // Add another destination for a different proxy. anotherProxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-def").ID() destination3 := testDestination(anotherProxyID) - destination3.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-3").ReferenceNoSection() + destination3.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-3"). + WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection() cache.WriteDestination(destination3) actualSourceProxies = cache.sourceProxiesIndex @@ -91,15 +95,17 @@ func TestWrite_Update(t *testing.T) { } func TestWrite_Delete(t *testing.T) { - cache := New() + cache := NewDestinationsCache() - proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() destination1 := testDestination(proxyID) cache.WriteDestination(destination1) // Add another destination for the same proxy ID. destination2 := testDestination(proxyID) - destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2"). + WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection() cache.WriteDestination(destination2) cache.DeleteDestination(destination1.ServiceRef, destination1.Port) @@ -117,7 +123,8 @@ func TestWrite_Delete(t *testing.T) { // Try to delete non-existing destination and check that nothing has changed.. cache.DeleteDestination( - resourcetest.Resource(catalog.ServiceType, "does-not-exist").ReferenceNoSection(), + resourcetest.Resource(catalog.ServiceType, "does-not-exist"). + WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection(), "doesn't-matter") require.Contains(t, cache.store, KeyFromRefAndPort(destination2.ServiceRef, destination2.Port)) @@ -125,15 +132,17 @@ func TestWrite_Delete(t *testing.T) { } func TestDeleteSourceProxy(t *testing.T) { - cache := New() + cache := NewDestinationsCache() - proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() destination1 := testDestination(proxyID) cache.WriteDestination(destination1) // Add another destination for the same proxy ID. destination2 := testDestination(proxyID) - destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2"). + WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection() cache.WriteDestination(destination2) cache.DeleteSourceProxy(proxyID) @@ -160,15 +169,17 @@ func TestDeleteSourceProxy(t *testing.T) { } func TestDestinationsBySourceProxy(t *testing.T) { - cache := New() + cache := NewDestinationsCache() - proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() destination1 := testDestination(proxyID) cache.WriteDestination(destination1) // Add another destination for the same proxy ID. destination2 := testDestination(proxyID) - destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2"). + WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection() cache.WriteDestination(destination2) actualDestinations := cache.DestinationsBySourceProxy(proxyID) @@ -178,9 +189,11 @@ func TestDestinationsBySourceProxy(t *testing.T) { func testDestination(proxyID *pbresource.ID) intermediate.CombinedDestinationRef { return intermediate.CombinedDestinationRef{ - ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service").ReferenceNoSection(), - Port: "tcp", - ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations").ID(), + ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service"). + WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection(), + Port: "tcp", + ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID(), SourceProxies: map[resource.ReferenceKey]struct{}{ resource.NewReferenceKey(proxyID): {}, }, diff --git a/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go new file mode 100644 index 0000000000..4e45486abb --- /dev/null +++ b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go @@ -0,0 +1,41 @@ +package sidecarproxycache + +import ( + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/mappers/bimapper" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// ProxyConfigurationCache tracks mappings between proxy configurations and proxy IDs +// that a configuration applies to. It is the responsibility of the controller to +// keep this cache up-to-date. +type ProxyConfigurationCache struct { + mapper *bimapper.Mapper +} + +func NewProxyConfigurationCache() *ProxyConfigurationCache { + return &ProxyConfigurationCache{ + mapper: bimapper.New(types.ProxyConfigurationType, types.ProxyStateTemplateType), + } +} + +// ProxyConfigurationsByProxyID returns proxy configuration IDs given the id of the proxy state template. +func (c *ProxyConfigurationCache) ProxyConfigurationsByProxyID(id *pbresource.ID) []*pbresource.ID { + return c.mapper.ItemIDsForLink(id) +} + +// TrackProxyConfiguration tracks given proxy configuration ID and the linked proxy state template IDs. +func (c *ProxyConfigurationCache) TrackProxyConfiguration(proxyCfgID *pbresource.ID, proxyIDs []resource.ReferenceOrID) { + c.mapper.TrackItem(proxyCfgID, proxyIDs) +} + +// UntrackProxyConfiguration removes tracking for the given proxy configuration ID. +func (c *ProxyConfigurationCache) UntrackProxyConfiguration(proxyCfgID *pbresource.ID) { + c.mapper.UntrackItem(proxyCfgID) +} + +// UntrackProxyID removes tracking for the given proxy state template ID. +func (c *ProxyConfigurationCache) UntrackProxyID(proxyID *pbresource.ID) { + c.mapper.UntrackLink(proxyID) +} diff --git a/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go new file mode 100644 index 0000000000..ec3e8d1250 --- /dev/null +++ b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go @@ -0,0 +1,77 @@ +package sidecarproxycache + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" +) + +func TestProxyConfigurationCache(t *testing.T) { + cache := NewProxyConfigurationCache() + + // Create some proxy configurations. + proxyCfg1 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-1"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + proxyCfg2 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-2"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + proxyCfg3 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-3"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + + // Create some proxy state templates. + p1 := resourcetest.Resource(types.ProxyStateTemplateType, "w-111"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + p2 := resourcetest.Resource(types.ProxyStateTemplateType, "w-222"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + p3 := resourcetest.Resource(types.ProxyStateTemplateType, "w-333"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + p4 := resourcetest.Resource(types.ProxyStateTemplateType, "w-444"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + p5 := resourcetest.Resource(types.ProxyStateTemplateType, "w-555"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID() + + // Track these and make sure there's some overlap. + cache.TrackProxyConfiguration(proxyCfg1, []resource.ReferenceOrID{p1, p2, p4}) + cache.TrackProxyConfiguration(proxyCfg2, []resource.ReferenceOrID{p3, p4, p5}) + cache.TrackProxyConfiguration(proxyCfg3, []resource.ReferenceOrID{p1, p3}) + + // Read proxy configurations by proxy. + requireProxyConfigurations(t, cache, p1, proxyCfg1, proxyCfg3) + requireProxyConfigurations(t, cache, p2, proxyCfg1) + requireProxyConfigurations(t, cache, p3, proxyCfg2, proxyCfg3) + requireProxyConfigurations(t, cache, p4, proxyCfg1, proxyCfg2) + requireProxyConfigurations(t, cache, p5, proxyCfg2) + + // Untrack some proxy IDs. + cache.UntrackProxyID(p1) + + requireProxyConfigurations(t, cache, p1) + + // Untrack some proxy IDs. + cache.UntrackProxyID(p3) + + requireProxyConfigurations(t, cache, p3) + + // Untrack proxy cfg. + cache.UntrackProxyConfiguration(proxyCfg1) + + requireProxyConfigurations(t, cache, p1) // no-op because we untracked it earlier + requireProxyConfigurations(t, cache, p2) + requireProxyConfigurations(t, cache, p3) // no-op because we untracked it earlier + requireProxyConfigurations(t, cache, p4, proxyCfg2) + requireProxyConfigurations(t, cache, p5, proxyCfg2) +} + +func requireProxyConfigurations(t *testing.T, cache *ProxyConfigurationCache, proxyID *pbresource.ID, proxyCfgs ...*pbresource.ID) { + t.Helper() + + actualProxyCfgs := cache.ProxyConfigurationsByProxyID(proxyID) + + require.Len(t, actualProxyCfgs, len(proxyCfgs)) + prototest.AssertElementsMatch(t, proxyCfgs, actualProxyCfgs) +} diff --git a/internal/mesh/internal/controllers/register.go b/internal/mesh/internal/controllers/register.go index 2a31d0725a..e24decb371 100644 --- a/internal/mesh/internal/controllers/register.go +++ b/internal/mesh/internal/controllers/register.go @@ -16,14 +16,19 @@ import ( type Dependencies struct { TrustDomainFetcher sidecarproxy.TrustDomainFetcher + LocalDatacenter string TrustBundleFetcher xds.TrustBundleFetcher ProxyUpdater xds.ProxyUpdater } func Register(mgr *controller.Manager, deps Dependencies) { - c := sidecarproxycache.New() - m := sidecarproxymapper.New(c) mapper := bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType) mgr.Register(xds.Controller(mapper, deps.ProxyUpdater, deps.TrustBundleFetcher)) - mgr.Register(sidecarproxy.Controller(c, m, deps.TrustDomainFetcher)) + + destinationsCache := sidecarproxycache.NewDestinationsCache() + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() + m := sidecarproxymapper.New(destinationsCache, proxyCfgCache) + mgr.Register( + sidecarproxy.Controller(destinationsCache, proxyCfgCache, m, deps.TrustDomainFetcher, deps.LocalDatacenter), + ) } diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go b/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go index 257b94d520..40008b7702 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go @@ -10,13 +10,25 @@ import ( type Builder struct { id *pbresource.ID proxyStateTemplate *pbmesh.ProxyStateTemplate + proxyCfg *pbmesh.ProxyConfiguration trustDomain string + localDatacenter string + + outboundListenerBuilder *ListenerBuilder } -func New(id *pbresource.ID, identity *pbresource.Reference, trustDomain string) *Builder { +func New(id *pbresource.ID, + identity *pbresource.Reference, + trustDomain string, + dc string, + proxyCfg *pbmesh.ProxyConfiguration, +) *Builder { + return &Builder{ - id: id, - trustDomain: trustDomain, + id: id, + trustDomain: trustDomain, + localDatacenter: dc, + proxyCfg: proxyCfg, proxyStateTemplate: &pbmesh.ProxyStateTemplate{ ProxyState: &pbmesh.ProxyState{ Identity: identity, diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go index 823343b189..4f65a43cf4 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go @@ -4,8 +4,12 @@ package builder import ( + "google.golang.org/protobuf/types/known/wrapperspb" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" @@ -13,9 +17,17 @@ import ( ) func (b *Builder) BuildDestinations(destinations []*intermediate.Destination) *Builder { + if b.proxyCfg.GetDynamicConfig() != nil && + b.proxyCfg.DynamicConfig.Mode == pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT { + + b.addOutboundListener(b.proxyCfg.DynamicConfig.TransparentProxy.OutboundListenerPort) + } + for _, destination := range destinations { if destination.Explicit != nil { b.buildExplicitDestination(destination) + } else { + b.buildImplicitDestination(destination) } } @@ -47,6 +59,29 @@ func (b *Builder) buildExplicitDestination(destination *intermediate.Destination } } +func (b *Builder) buildImplicitDestination(destination *intermediate.Destination) { + serviceRef := resource.Reference(destination.ServiceEndpoints.Resource.Owner, "") + clusterName := DestinationClusterName(serviceRef, b.localDatacenter, b.trustDomain) + statPrefix := DestinationStatPrefix(serviceRef, b.localDatacenter) + + // We assume that all endpoints have the same port protocol and name, and so it's sufficient + // to check ports just from the first endpoint. + if len(destination.ServiceEndpoints.Endpoints.Endpoints) > 0 { + // Find the destination proxy's port. + // Endpoints refs will need to route to mesh port instead of the destination port as that + // is the port of the destination's proxy. + meshPortName := findMeshPort(destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports) + + for _, port := range destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports { + b.outboundListenerBuilder. + addRouterWithIPMatch(clusterName, statPrefix, port.Protocol, destination.VirtualIPs). + buildListener(). + addCluster(clusterName, destination.Identities). + addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, meshPortName) + } + } +} + func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *ListenerBuilder { listener := &pbproxystate.Listener{ Direction: pbproxystate.Direction_DIRECTION_OUTBOUND, @@ -77,18 +112,55 @@ func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Lis return b.NewListenerBuilder(listener) } +func (b *Builder) addOutboundListener(port uint32) *ListenerBuilder { + listener := &pbproxystate.Listener{ + Name: xdscommon.OutboundListenerName, + Direction: pbproxystate.Direction_DIRECTION_OUTBOUND, + BindAddress: &pbproxystate.Listener_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "127.0.0.1", + Port: port, + }, + }, + Capabilities: []pbproxystate.Capability{pbproxystate.Capability_CAPABILITY_TRANSPARENT}, + } + + lb := b.NewListenerBuilder(listener) + + // Save outbound listener builder so we can use it in the future. + b.outboundListenerBuilder = lb + + return lb +} + func (l *ListenerBuilder) addRouter(clusterName, statPrefix string, protocol pbcatalog.Protocol) *ListenerBuilder { + return l.addRouterWithIPMatch(clusterName, statPrefix, protocol, nil) +} + +func (l *ListenerBuilder) addRouterWithIPMatch(clusterName, statPrefix string, protocol pbcatalog.Protocol, vips []string) *ListenerBuilder { // For explicit destinations, we have no filter chain match, and filters are based on port protocol. + router := &pbproxystate.Router{} switch protocol { case pbcatalog.Protocol_PROTOCOL_TCP: - router := &pbproxystate.Router{ - Destination: &pbproxystate.Router_L4{ - L4: &pbproxystate.L4Destination{ - Name: clusterName, - StatPrefix: statPrefix, - }, + router.Destination = &pbproxystate.Router_L4{ + L4: &pbproxystate.L4Destination{ + Name: clusterName, + StatPrefix: statPrefix, }, } + } + + if router.Destination != nil { + for _, vip := range vips { + if router.Match == nil { + router.Match = &pbproxystate.Match{} + } + + router.Match.PrefixRanges = append(router.Match.PrefixRanges, &pbproxystate.CidrRange{ + AddressPrefix: vip, + PrefixLen: &wrapperspb.UInt32Value{Value: 32}, + }) + } l.listener.Routers = append(l.listener.Routers, router) } return l @@ -100,7 +172,7 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso spiffeIDs = append(spiffeIDs, connect.SpiffeIDFromIdentityRef(b.trustDomain, identity)) } - // Create destination cluster + // Create destination cluster. cluster := &pbproxystate.Cluster{ Group: &pbproxystate.Cluster_EndpointGroup{ EndpointGroup: &pbproxystate.EndpointGroup{ @@ -130,12 +202,11 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso return b } -func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) *Builder { +func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) { b.proxyStateTemplate.RequiredEndpoints[clusterName] = &pbproxystate.EndpointRef{ Id: serviceEndpointsID, Port: destinationPort, } - return b } func findMeshPort(ports map[string]*pbcatalog.WorkloadPort) string { diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go index ee4be71f6f..f423c1da9e 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go @@ -96,7 +96,100 @@ func TestBuildExplicitDestinations(t *testing.T) { } for name, c := range cases { - proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul"). + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", nil). + BuildDestinations(c.destinations). + Build() + + actual := protoToJSON(t, proxyTmpl) + expected := golden.Get(t, actual, name) + + require.JSONEq(t, expected, actual) + } +} + +func TestBuildImplicitDestinations(t *testing.T) { + api1Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithOwner( + resourcetest.Resource(catalog.ServiceType, "api-1"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID()). + WithTenancy(resource.DefaultNamespacedTenancy()). + WithData(t, endpointsData).Build() + + api2Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-2"). + WithOwner(resourcetest.Resource(catalog.ServiceType, "api-2"). + WithTenancy(resource.DefaultNamespacedTenancy()).ID()). + WithTenancy(resource.DefaultNamespacedTenancy()). + WithData(t, endpointsData).Build() + + api1Identity := &pbresource.Reference{ + Name: "api1-identity", + Tenancy: api1Endpoints.Id.Tenancy, + } + + api2Identity := &pbresource.Reference{ + Name: "api2-identity", + Tenancy: api2Endpoints.Id.Tenancy, + } + + proxyCfg := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + TransparentProxy: &pbmesh.TransparentProxy{ + OutboundListenerPort: 15001, + }, + }, + } + + destination1 := &intermediate.Destination{ + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api1Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api1Identity}, + VirtualIPs: []string{"1.1.1.1"}, + } + + destination2 := &intermediate.Destination{ + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api2Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api2Identity}, + VirtualIPs: []string{"2.2.2.2", "3.3.3.3"}, + } + + destination3 := &intermediate.Destination{ + Explicit: &pbmesh.Upstream{ + DestinationRef: resource.Reference(api1Endpoints.Id, ""), + DestinationPort: "tcp", + Datacenter: "dc1", + ListenAddr: &pbmesh.Upstream_IpPort{ + IpPort: &pbmesh.IPPortAddress{Ip: "1.1.1.1", Port: 1234}, + }, + }, + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api1Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api1Identity}, + } + + cases := map[string]struct { + destinations []*intermediate.Destination + }{ + "l4-single-implicit-destination-tproxy": { + destinations: []*intermediate.Destination{destination1}, + }, + "l4-multiple-implicit-destinations-tproxy": { + destinations: []*intermediate.Destination{destination1, destination2}, + }, + "l4-implicit-and-explicit-destinations-tproxy": { + destinations: []*intermediate.Destination{destination2, destination3}, + }, + } + + for name, c := range cases { + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", proxyCfg). BuildDestinations(c.destinations). Build() diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go index d846a21d13..298cb608b0 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go @@ -68,7 +68,8 @@ func TestBuildLocalApp(t *testing.T) { for name, c := range cases { t.Run(name, func(t *testing.T) { - proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul").BuildLocalApp(c.workload). + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", nil). + BuildLocalApp(c.workload). Build() actual := protoToJSON(t, proxyTmpl) expected := golden.Get(t, actual, name+".golden") diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden new file mode 100644 index 0000000000..dbda8aebe7 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden @@ -0,0 +1,199 @@ +{ + "proxyState": { + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-1.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + } + } + } + } + } + }, + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-2.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + } + } + } + } + } + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + } + }, + "listeners": [ + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 + }, + "name": "api-1:tcp:1.1.1.1:1234", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ] + } + ] + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + }, + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination new file mode 100644 index 0000000000..24b06ad29b --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination @@ -0,0 +1,122 @@ +{ + "proxyState": { + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-1.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + } + } + } + } + } + }, + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-2.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + } + } + } + } + } + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + } + }, + "listeners": [ + { + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 + }, + "name": "api-1:tcp:1.1.1.1:1234", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ] + }, + { + "direction": "DIRECTION_OUTBOUND", + "name": "api-2:tcp:/path/to/socket", + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + } + } + ], + "unixSocket": { + "mode": "0666", + "path": "/path/to/socket" + } + } + ] + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + }, + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden new file mode 100644 index 0000000000..6c1bfb6a12 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden @@ -0,0 +1,360 @@ +{ + "proxyState": { + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-1.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + } + } + } + } + } + }, + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-2.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + } + } + } + } + } + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + } + }, + "listeners": [ + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + }, + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + }, + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + }, + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + }, + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + }, + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + }, + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + } + } + ] + } + ] + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + }, + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address new file mode 100644 index 0000000000..b426d287e9 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address @@ -0,0 +1,70 @@ +{ + "proxyState": { + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-1.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + } + } + } + } + } + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + } + }, + "listeners": [ + { + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 + }, + "name": "api-1:tcp:1.1.1.1:1234", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ] + } + ] + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address new file mode 100644 index 0000000000..cc2bd0fdc5 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address @@ -0,0 +1,70 @@ +{ + "proxyState": { + "clusters": { + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-2.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + } + } + } + } + } + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + } + }, + "listeners": [ + { + "direction": "DIRECTION_OUTBOUND", + "name": "api-2:tcp:/path/to/socket", + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + } + } + ], + "unixSocket": { + "mode": "0666", + "path": "/path/to/socket" + } + } + ] + }, + "requiredEndpoints": { + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden new file mode 100644 index 0000000000..3baaa44f7d --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden @@ -0,0 +1,135 @@ +{ + "proxyState": { + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "sni": "api-1.default.dc1.internal.foo.consul", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + } + } + } + } + } + } + }, + "identity": { + "name": "test-identity", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + } + }, + "listeners": [ + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + } + ] + }, + { + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ], + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "name": "outbound_listener", + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + }, + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + } + } + ] + } + ] + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "tenancy": { + "namespace": "default", + "partition": "default", + "peerName": "local" + }, + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/controller.go b/internal/mesh/internal/controllers/sidecarproxy/controller.go index a99cfcce11..bb8ddc3c7b 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/controller.go +++ b/internal/mesh/internal/controllers/sidecarproxy/controller.go @@ -26,20 +26,33 @@ const ControllerName = "consul.io/sidecar-proxy-controller" type TrustDomainFetcher func() (string, error) -func Controller(cache *sidecarproxycache.Cache, mapper *sidecarproxymapper.Mapper, trustDomainFetcher TrustDomainFetcher) controller.Controller { - if cache == nil || mapper == nil || trustDomainFetcher == nil { - panic("cache, mapper and trust domain fetcher are required") +func Controller(destinationsCache *sidecarproxycache.DestinationsCache, + proxyCfgCache *sidecarproxycache.ProxyConfigurationCache, + mapper *sidecarproxymapper.Mapper, + trustDomainFetcher TrustDomainFetcher, + dc string) controller.Controller { + + if destinationsCache == nil || proxyCfgCache == nil || mapper == nil || trustDomainFetcher == nil { + panic("destinations cache, proxy configuration cache, mapper and trust domain fetcher are required") } return controller.ForType(types.ProxyStateTemplateType). WithWatch(catalog.ServiceEndpointsType, mapper.MapServiceEndpointsToProxyStateTemplate). WithWatch(types.UpstreamsType, mapper.MapDestinationsToProxyStateTemplate). - WithReconciler(&reconciler{cache: cache, getTrustDomain: trustDomainFetcher}) + WithWatch(types.ProxyConfigurationType, mapper.MapProxyConfigurationToProxyStateTemplate). + WithReconciler(&reconciler{ + destinationsCache: destinationsCache, + proxyCfgCache: proxyCfgCache, + getTrustDomain: trustDomainFetcher, + dc: dc, + }) } type reconciler struct { - cache *sidecarproxycache.Cache - getTrustDomain TrustDomainFetcher + destinationsCache *sidecarproxycache.DestinationsCache + proxyCfgCache *sidecarproxycache.ProxyConfigurationCache + getTrustDomain TrustDomainFetcher + dc string } func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { @@ -48,7 +61,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c rt.Logger.Trace("reconciling proxy state template") // Instantiate a data fetcher to fetch all reconciliation data. - dataFetcher := fetcher.Fetcher{Client: rt.Client, Cache: r.cache} + dataFetcher := fetcher.New(rt.Client, r.destinationsCache, r.proxyCfgCache) // Check if the workload exists. workloadID := resource.ReplaceType(catalog.WorkloadType, req.ID) @@ -87,8 +100,8 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return err } - // Remove it from cache. - r.cache.DeleteSourceProxy(req.ID) + // Remove it from destinationsCache. + r.destinationsCache.DeleteSourceProxy(req.ID) } rt.Logger.Trace("skipping proxy state template generation because workload is not on the mesh", "workload", workload.Resource.Id) return nil @@ -101,22 +114,39 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return err } - b := builder.New(req.ID, workloadIdentityRefFromWorkload(workload), trustDomain). + // Fetch proxy configuration. + proxyCfg, err := dataFetcher.FetchAndMergeProxyConfigurations(ctx, req.ID) + if err != nil { + rt.Logger.Error("error fetching proxy and merging proxy configurations", "error", err) + return err + } + b := builder.New(req.ID, identityRefFromWorkload(workload), trustDomain, r.dc, proxyCfg). BuildLocalApp(workload.Workload) // Get all destinationsData. - destinationsRefs := r.cache.DestinationsBySourceProxy(req.ID) - destinationsData, statuses, err := dataFetcher.FetchDestinationsData(ctx, destinationsRefs) + destinationsRefs := r.destinationsCache.DestinationsBySourceProxy(req.ID) + destinationsData, statuses, err := dataFetcher.FetchExplicitDestinationsData(ctx, destinationsRefs) if err != nil { - rt.Logger.Error("error fetching destinations for this proxy", "error", err) + rt.Logger.Error("error fetching explicit destinations for this proxy", "error", err) return err } + if proxyCfg.IsTransparentProxy() { + destinationsData, err = dataFetcher.FetchImplicitDestinationsData(ctx, req.ID, destinationsData) + if err != nil { + rt.Logger.Error("error fetching implicit destinations for this proxy", "error", err) + return err + } + } + b.BuildDestinations(destinationsData) newProxyTemplate := b.Build() if proxyStateTemplate == nil || !proto.Equal(proxyStateTemplate.Tmpl, newProxyTemplate) { + if proxyStateTemplate == nil { + req.ID.Uid = "" + } proxyTemplateData, err := anypb.New(newProxyTemplate) if err != nil { rt.Logger.Error("error creating proxy state template data", "error", err) @@ -161,7 +191,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return nil } -func workloadIdentityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference { +func identityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference { return &pbresource.Reference{ Name: w.Workload.Identity, Tenancy: w.Resource.Id.Tenancy, diff --git a/internal/mesh/internal/controllers/sidecarproxy/controller_test.go b/internal/mesh/internal/controllers/sidecarproxy/controller_test.go index fbebd244c8..e06f058d2b 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/controller_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/controller_test.go @@ -1,13 +1,18 @@ // Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: MPL-2.0 +// SPDX-License-Identifier: BUSL-1.1 package sidecarproxy import ( "context" + "strings" "testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" @@ -24,8 +29,6 @@ import ( "github.com/hashicorp/consul/proto/private/prototest" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" ) type meshControllerTestSuite struct { @@ -53,7 +56,8 @@ func (suite *meshControllerTestSuite) SetupTest() { suite.ctx = testutil.TestContext(suite.T()) suite.ctl = &reconciler{ - cache: sidecarproxycache.New(), + destinationsCache: sidecarproxycache.NewDestinationsCache(), + proxyCfgCache: sidecarproxycache.NewProxyConfigurationCache(), getTrustDomain: func() (string, error) { return "test.consul", nil }, @@ -78,7 +82,8 @@ func (suite *meshControllerTestSuite) SetupTest() { suite.apiService = resourcetest.Resource(catalog.ServiceType, "api-service"). WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}}, + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}}, + VirtualIps: []string{"1.1.1.1"}, Ports: []*pbcatalog.ServicePort{ {TargetPort: "tcp", Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, }}). @@ -140,7 +145,7 @@ func (suite *meshControllerTestSuite) SetupTest() { Tenancy: suite.apiWorkloadID.Tenancy, } - suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul"). + suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul", "dc1", nil). BuildLocalApp(suite.apiWorkload). Build() } @@ -255,17 +260,19 @@ func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_N func (suite *meshControllerTestSuite) TestController() { // This is a comprehensive test that checks the overall controller behavior as various resources change state. - // This should test interactions between the reconciler, the mappers, and the cache to ensure they work + // This should test interactions between the reconciler, the mappers, and the destinationsCache to ensure they work // together and produce expected result. // Run the controller manager mgr := controller.NewManager(suite.client, suite.runtime.Logger) - c := sidecarproxycache.New() - m := sidecarproxymapper.New(c) - mgr.Register(Controller(c, m, func() (string, error) { - return "test.consul", nil - })) + // Initialize controller dependencies. + destinationsCache := sidecarproxycache.NewDestinationsCache() + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() + m := sidecarproxymapper.New(destinationsCache, proxyCfgCache) + trustDomainFetcher := func() (string, error) { return "test.consul", nil } + + mgr.Register(Controller(destinationsCache, proxyCfgCache, m, trustDomainFetcher, "dc1")) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -273,96 +280,239 @@ func (suite *meshControllerTestSuite) TestController() { apiProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "api-abc").ID() webProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "web-def").ID() - // Check that proxy state template resource is generated for both the api and web workloads. - var webProxyStateTemplate *pbresource.Resource - retry.Run(suite.T(), func(r *retry.R) { - suite.client.RequireResourceExists(r, apiProxyStateTemplateID) - webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID) + var ( + webProxyStateTemplate *pbresource.Resource + webDestinations *pbresource.Resource + ) + + testutil.RunStep(suite.T(), "proxy state template generation", func(t *testing.T) { + // Check that proxy state template resource is generated for both the api and web workloads. + retry.Run(t, func(r *retry.R) { + suite.client.RequireResourceExists(r, apiProxyStateTemplateID) + webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID) + }) }) - // Add a source service and check that a new proxy state is generated. - webDestinations := resourcetest.Resource(types.UpstreamsType, "web-destinations"). - WithData(suite.T(), &pbmesh.Upstreams{ - Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}}, - Upstreams: []*pbmesh.Upstream{ - { - DestinationRef: resource.Reference(suite.apiService.Id, ""), - DestinationPort: "tcp", + testutil.RunStep(suite.T(), "add explicit destinations and check that new proxy state is generated", func(t *testing.T) { + // Add a source service and check that a new proxy state is generated. + webDestinations = resourcetest.Resource(types.UpstreamsType, "web-destinations"). + WithData(suite.T(), &pbmesh.Upstreams{ + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}}, + Upstreams: []*pbmesh.Upstream{ + { + DestinationRef: resource.Reference(suite.apiService.Id, ""), + DestinationPort: "tcp", + ListenAddr: &pbmesh.Upstream_IpPort{ + IpPort: &pbmesh.IPPortAddress{ + Ip: "127.0.0.1", + Port: 1234, + }, + }, + }, }, - }, - }).Write(suite.T(), suite.client) - webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + }).Write(suite.T(), suite.client) + webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version) - // Update destination's service endpoints and workload to be non-mesh - // and check that: - // * api's proxy state template is deleted - // * we get a new web proxy resource re-generated - // * the status on Upstreams resource is updated with a validation error - nonMeshPorts := map[string]*pbcatalog.WorkloadPort{ - "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, - } - - // Note: the order matters here because in reality service endpoints will only - // be reconciled after the workload has been updated, and so we need to write the - // workload before we write service endpoints. - suite.runtime.Logger.Trace("test: updating api-abc workload to be non-mesh") - resourcetest.Resource(catalog.WorkloadType, "api-abc"). - WithData(suite.T(), &pbcatalog.Workload{ - Identity: "api-identity", - Addresses: suite.apiWorkload.Addresses, - Ports: nonMeshPorts}). - Write(suite.T(), suite.client) - - suite.runtime.Logger.Trace("test: updating api-service to be non-mesh") - resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). - WithData(suite.T(), &pbcatalog.ServiceEndpoints{ - Endpoints: []*pbcatalog.Endpoint{ - { - TargetRef: suite.apiWorkloadID, - Addresses: suite.apiWorkload.Addresses, - Ports: nonMeshPorts, - Identity: "api-identity", - }, - }, - }). - Write(suite.T(), suite.client.ResourceServiceClient) - - // Check that api proxy template is gone. - retry.Run(suite.T(), func(r *retry.R) { - suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID) + requireExplicitDestinationsFound(t, "api", webProxyStateTemplate) }) - // Check status on the pbmesh.Upstreams resource. - serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, "")) - suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName, - status.ConditionMeshProtocolNotFound(serviceRef)) + testutil.RunStep(suite.T(), "update api's ports to be non-mesh", func(t *testing.T) { + // Update destination's service endpoints and workload to be non-mesh + // and check that: + // * api's proxy state template is deleted + // * we get a new web proxy resource re-generated + // * the status on Upstreams resource is updated with a validation error + nonMeshPorts := map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + } - // We should get a new web proxy template resource because this destination should be removed. - webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + // Note: the order matters here because in reality service endpoints will only + // be reconciled after the workload has been updated, and so we need to write the + // workload before we write service endpoints. + resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), &pbcatalog.Workload{ + Identity: "api-identity", + Addresses: suite.apiWorkload.Addresses, + Ports: nonMeshPorts}). + Write(suite.T(), suite.client) - // Update destination's service apiEndpoints back to mesh and check that we get a new web proxy resource re-generated - // and that the status on Upstreams resource is updated to be empty. - resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). - WithData(suite.T(), suite.apiEndpointsData). - Write(suite.T(), suite.client.ResourceServiceClient) + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: suite.apiWorkloadID, + Addresses: suite.apiWorkload.Addresses, + Ports: nonMeshPorts, + Identity: "api-identity", + }, + }, + }). + Write(suite.T(), suite.client.ResourceServiceClient) - suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName, - status.ConditionMeshProtocolFound(serviceRef)) + // Check that api proxy template is gone. + retry.Run(t, func(r *retry.R) { + suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID) + }) - // We should also get a new web proxy template resource as this destination should be added again. - webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + // Check status on the pbmesh.Upstreams resource. + serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, "")) + suite.client.WaitForStatusCondition(t, webDestinations.Id, ControllerName, + status.ConditionMeshProtocolNotFound(serviceRef)) - // Delete the proxy state template resource and check that it gets regenerated. - _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webProxyStateTemplateID}) - require.NoError(suite.T(), err) + // We should get a new web proxy template resource because this destination should be removed. + webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version) - suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + requireExplicitDestinationsNotFound(t, "api", webProxyStateTemplate) + }) + + testutil.RunStep(suite.T(), "update ports to be mesh again", func(t *testing.T) { + // Update destination's service endpoints back to mesh and check that we get a new web proxy resource re-generated + // and that the status on Upstreams resource is updated to be empty. + suite.runtime.Logger.Trace("updating ports to mesh") + resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). + Write(suite.T(), suite.client) + + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), suite.apiEndpointsData). + Write(suite.T(), suite.client.ResourceServiceClient) + + serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, "")) + suite.client.WaitForStatusCondition(t, webDestinations.Id, ControllerName, + status.ConditionMeshProtocolFound(serviceRef)) + + // We should also get a new web proxy template resource as this destination should be added again. + webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version) + + requireExplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) + + testutil.RunStep(suite.T(), "delete the proxy state template and check re-generation", func(t *testing.T) { + // Delete the proxy state template resource and check that it gets regenerated. + suite.runtime.Logger.Trace("deleting web proxy") + _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webProxyStateTemplateID}) + require.NoError(suite.T(), err) + + webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + requireExplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) + + testutil.RunStep(suite.T(), "add implicit upstream and enable tproxy", func(t *testing.T) { + // Delete explicit destinations resource. + suite.runtime.Logger.Trace("deleting web destinations") + _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webDestinations.Id}) + require.NoError(t, err) + + webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + + // Enable transparent proxy for the web proxy. + resourcetest.Resource(types.ProxyConfigurationType, "proxy-config"). + WithData(t, &pbmesh.ProxyConfiguration{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"web"}, + }, + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + TransparentProxy: &pbmesh.TransparentProxy{ + OutboundListenerPort: 15001, + }, + }, + }).Write(t, suite.client) + + webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + + requireImplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) } func TestMeshController(t *testing.T) { suite.Run(t, new(meshControllerTestSuite)) } +func requireExplicitDestinationsFound(t *testing.T, name string, tmplResource *pbresource.Resource) { + requireExplicitDestinations(t, name, tmplResource, true) +} + +func requireExplicitDestinationsNotFound(t *testing.T, name string, tmplResource *pbresource.Resource) { + requireExplicitDestinations(t, name, tmplResource, false) +} + +func requireExplicitDestinations(t *testing.T, name string, tmplResource *pbresource.Resource, found bool) { + t.Helper() + + var tmpl pbmesh.ProxyStateTemplate + err := tmplResource.Data.UnmarshalTo(&tmpl) + require.NoError(t, err) + + // Check outbound listener. + var foundListener bool + for _, l := range tmpl.ProxyState.Listeners { + if strings.Contains(l.Name, name) && l.Direction == pbproxystate.Direction_DIRECTION_OUTBOUND { + foundListener = true + break + } + } + + require.Equal(t, found, foundListener) + + requireClustersAndEndpoints(t, name, &tmpl, found) +} + +func requireImplicitDestinationsFound(t *testing.T, name string, tmplResource *pbresource.Resource) { + t.Helper() + + var tmpl pbmesh.ProxyStateTemplate + err := tmplResource.Data.UnmarshalTo(&tmpl) + require.NoError(t, err) + + // Check outbound listener. + var foundListener bool + for _, l := range tmpl.ProxyState.Listeners { + if strings.Contains(l.Name, xdscommon.OutboundListenerName) && l.Direction == pbproxystate.Direction_DIRECTION_OUTBOUND { + foundListener = true + + // Check the listener filter chain + for _, r := range l.Routers { + destName := r.Destination.(*pbproxystate.Router_L4).L4.Name + if strings.Contains(destName, name) { + // We expect that there is a filter chain match for transparent proxy destinations. + require.NotNil(t, r.Match) + require.NotEmpty(t, r.Match.PrefixRanges) + break + } + } + break + } + } + require.True(t, foundListener) + + requireClustersAndEndpoints(t, name, &tmpl, true) +} + +func requireClustersAndEndpoints(t *testing.T, name string, tmpl *pbmesh.ProxyStateTemplate, found bool) { + t.Helper() + + var foundCluster bool + for c := range tmpl.ProxyState.Clusters { + if strings.Contains(c, name) { + foundCluster = true + break + } + } + + require.Equal(t, found, foundCluster) + + var foundEndpoints bool + for c := range tmpl.RequiredEndpoints { + if strings.Contains(c, name) { + foundEndpoints = true + break + } + } + + require.Equal(t, found, foundEndpoints) +} + func resourceID(rtype *pbresource.Type, name string) *pbresource.ID { return &pbresource.ID{ Type: rtype, diff --git a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go index 9267ec93e2..088df11942 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go +++ b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go @@ -3,6 +3,10 @@ package fetcher import ( "context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" ctrlStatus "github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/status" @@ -12,13 +16,23 @@ import ( pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) type Fetcher struct { - Client pbresource.ResourceServiceClient - Cache *sidecarproxycache.Cache + Client pbresource.ResourceServiceClient + DestinationsCache *sidecarproxycache.DestinationsCache + ProxyCfgCache *sidecarproxycache.ProxyConfigurationCache +} + +func New(client pbresource.ResourceServiceClient, + dCache *sidecarproxycache.DestinationsCache, + pcfgCache *sidecarproxycache.ProxyConfigurationCache) *Fetcher { + + return &Fetcher{ + Client: client, + DestinationsCache: dCache, + ProxyCfgCache: pcfgCache, + } } func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Workload, error) { @@ -28,7 +42,9 @@ func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*interm case status.Code(err) == codes.NotFound: // We also need to make sure to delete the associated proxy from cache. // We are ignoring errors from cache here as this deletion is best effort. - f.Cache.DeleteSourceProxy(resource.ReplaceType(types.ProxyStateTemplateType, id)) + proxyID := resource.ReplaceType(types.ProxyStateTemplateType, id) + f.DestinationsCache.DeleteSourceProxy(proxyID) + f.ProxyCfgCache.UntrackProxyID(proxyID) return nil, nil case err != nil: return nil, err @@ -96,6 +112,30 @@ func (f *Fetcher) FetchServiceEndpoints(ctx context.Context, id *pbresource.ID) return se, nil } +func (f *Fetcher) FetchService(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Service, error) { + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + + switch { + case status.Code(err) == codes.NotFound: + return nil, nil + case err != nil: + return nil, err + } + + se := &intermediateTypes.Service{ + Resource: rsp.Resource, + } + + var service pbcatalog.Service + err = rsp.Resource.Data.UnmarshalTo(&service) + if err != nil { + return nil, resource.NewErrDataParse(&service, err) + } + + se.Service = &service + return se, nil +} + func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Destinations, error) { rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) @@ -120,9 +160,9 @@ func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*in return u, nil } -func (f *Fetcher) FetchDestinationsData( +func (f *Fetcher) FetchExplicitDestinationsData( ctx context.Context, - destinationRefs []intermediateTypes.CombinedDestinationRef, + explDestRefs []intermediateTypes.CombinedDestinationRef, ) ([]*intermediateTypes.Destination, map[string]*intermediateTypes.Status, error) { var ( @@ -130,7 +170,7 @@ func (f *Fetcher) FetchDestinationsData( statuses = make(map[string]*intermediateTypes.Status) ) - for _, dest := range destinationRefs { + for _, dest := range explDestRefs { // Fetch Destinations resource if there is one. us, err := f.FetchDestinations(ctx, dest.ExplicitDestinationsID) if err != nil { @@ -141,11 +181,12 @@ func (f *Fetcher) FetchDestinationsData( if us == nil { // If the Destinations resource is not found, then we should delete it from cache and continue. - f.Cache.DeleteDestination(dest.ServiceRef, dest.Port) + f.DestinationsCache.DeleteDestination(dest.ServiceRef, dest.Port) continue } d := &intermediateTypes.Destination{} + // As Destinations resource contains a list of destinations, // we need to find the one that references our service and port. d.Explicit = findDestination(dest.ServiceRef, dest.Port, us.Destinations) @@ -173,7 +214,7 @@ func (f *Fetcher) FetchDestinationsData( d.ServiceEndpoints = se // Check if this endpoints is mesh-enabled. If not, remove it from cache and return an error. - if !IsMeshEnabled(se.Endpoints.Endpoints[0].Ports) { + if len(se.Endpoints.Endpoints) > 0 && !IsMeshEnabled(se.Endpoints.Endpoints[0].Ports) { // Add invalid status but don't remove from cache. If this state changes, // we want to be able to detect this change. updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID, @@ -190,7 +231,9 @@ func (f *Fetcher) FetchDestinationsData( // No destination port should point to a port with "mesh" protocol, // so check if destination port has the mesh protocol and update the status. - if se.Endpoints.Endpoints[0].Ports[dest.Port].Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + if len(se.Endpoints.Endpoints) > 0 && + se.Endpoints.Endpoints[0].Ports[dest.Port].Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID, us.Resource.Status, us.Resource.Generation, ctrlStatus.ConditionMeshProtocolDestinationPort(serviceRef, dest.Port)) continue @@ -217,6 +260,124 @@ func (f *Fetcher) FetchDestinationsData( return destinations, statuses, nil } +// FetchImplicitDestinationsData fetches all implicit destinations and adds them to existing destinations. +// If the implicit destination is already in addToDestinations, it will be skipped. +// todo (ishustava): this function will eventually need to fetch implicit destinations from the ImplicitDestinations resource instead. +func (f *Fetcher) FetchImplicitDestinationsData(ctx context.Context, proxyID *pbresource.ID, addToDestinations []*intermediateTypes.Destination) ([]*intermediateTypes.Destination, error) { + // First, convert existing destinations to a map so we can de-dup. + destinations := make(map[resource.ReferenceKey]*intermediateTypes.Destination) + for _, d := range addToDestinations { + destinations[resource.NewReferenceKey(d.ServiceEndpoints.Resource.Id)] = d + } + + // For now, we need to look up all service endpoints within a partition. + rsp, err := f.Client.List(ctx, &pbresource.ListRequest{ + Type: catalog.ServiceEndpointsType, + Tenancy: &pbresource.Tenancy{ + Namespace: proxyID.Tenancy.Namespace, + Partition: proxyID.Tenancy.Partition, + PeerName: proxyID.Tenancy.PeerName, + }, + }) + if err != nil { + return nil, err + } + + for _, r := range rsp.Resources { + // If it's already in destinations, ignore it. + if _, ok := destinations[resource.NewReferenceKey(r.Id)]; ok { + continue + } + + var endpoints pbcatalog.ServiceEndpoints + err = r.Data.UnmarshalTo(&endpoints) + if err != nil { + return nil, err + } + + // If this proxy is a part of this service, ignore it. + if isPartOfService(resource.ReplaceType(catalog.WorkloadType, proxyID), &endpoints) { + continue + } + + // Skip if this service is not mesh-enabled. + if len(endpoints.Endpoints) > 0 && !IsMeshEnabled(endpoints.Endpoints[0].Ports) { + continue + } + + // Collect all identities. + var identities []*pbresource.Reference + for _, ep := range endpoints.Endpoints { + identities = append(identities, &pbresource.Reference{ + Name: ep.Identity, + Tenancy: r.Id.Tenancy, + }) + } + + // Fetch the service. + // todo (ishustava): this should eventually grab virtual IPs resource. + s, err := f.FetchService(ctx, resource.ReplaceType(catalog.ServiceType, r.Id)) + if err != nil { + return nil, err + } + if s == nil { + // If service no longer exists, skip. + continue + } + + d := &intermediateTypes.Destination{ + ServiceEndpoints: &intermediateTypes.ServiceEndpoints{ + Resource: r, + Endpoints: &endpoints, + }, + VirtualIPs: s.Service.VirtualIps, + Identities: identities, + } + addToDestinations = append(addToDestinations, d) + } + return addToDestinations, err +} + +// FetchAndMergeProxyConfigurations fetches proxy configurations for the proxy state template provided by id +// and merges them into one object. +func (f *Fetcher) FetchAndMergeProxyConfigurations(ctx context.Context, id *pbresource.ID) (*pbmesh.ProxyConfiguration, error) { + proxyCfgRefs := f.ProxyCfgCache.ProxyConfigurationsByProxyID(id) + + result := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{}, + } + for _, ref := range proxyCfgRefs { + proxyCfgID := &pbresource.ID{ + Name: ref.GetName(), + Type: ref.GetType(), + Tenancy: ref.GetTenancy(), + } + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{ + Id: proxyCfgID, + }) + switch { + case status.Code(err) == codes.NotFound: + f.ProxyCfgCache.UntrackProxyConfiguration(proxyCfgID) + return nil, nil + case err != nil: + return nil, err + } + + var proxyCfg pbmesh.ProxyConfiguration + err = rsp.Resource.Data.UnmarshalTo(&proxyCfg) + if err != nil { + return nil, err + } + + // Note that we only care about dynamic config as bootstrap config + // will not be updated dynamically by this controller. + // todo (ishustava): do sorting etc. + proto.Merge(result.DynamicConfig, proxyCfg.DynamicConfig) + } + + return result, nil +} + // IsMeshEnabled returns true if the workload or service endpoints port // contain a port with the "mesh" protocol. func IsMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool { @@ -256,3 +417,17 @@ func updateStatusCondition( } } } + +func isPartOfService(workloadID *pbresource.ID, endpoints *pbcatalog.ServiceEndpoints) bool { + // convert IDs to refs so that we can compare without UIDs. + workloadRef := resource.Reference(workloadID, "") + for _, ep := range endpoints.Endpoints { + if ep.TargetRef != nil { + targetRef := resource.Reference(ep.TargetRef, "") + if resource.EqualReference(workloadRef, targetRef) { + return true + } + } + } + return false +} diff --git a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go index e73bc7f8ea..6e7035089d 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go @@ -176,7 +176,8 @@ func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() { proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() // Create cache and pre-populate it. - c := sidecarproxycache.New() + destCache := sidecarproxycache.NewDestinationsCache() + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() dest1 := intermediate.CombinedDestinationRef{ ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service-1").ReferenceNoSection(), Port: "tcp", @@ -193,15 +194,19 @@ func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() { resource.NewReferenceKey(proxyID): {}, }, } - c.WriteDestination(dest1) - c.WriteDestination(dest2) + destCache.WriteDestination(dest1) + destCache.WriteDestination(dest2) - f := Fetcher{Cache: c, Client: suite.client} + proxyCfgID := resourcetest.Resource(types.ProxyConfigurationType, "proxy-config").ID() + proxyCfgCache.TrackProxyConfiguration(proxyCfgID, []resource.ReferenceOrID{proxyID}) + + f := Fetcher{DestinationsCache: destCache, ProxyCfgCache: proxyCfgCache, Client: suite.client} _, err := f.FetchWorkload(context.Background(), proxyID) require.NoError(suite.T(), err) // Check that cache is updated to remove proxy id. - require.Nil(suite.T(), c.DestinationsBySourceProxy(proxyID)) + require.Nil(suite.T(), destCache.DestinationsBySourceProxy(proxyID)) + require.Nil(suite.T(), proxyCfgCache.ProxyConfigurationsByProxyID(proxyID)) } func (suite *dataFetcherSuite) TestFetcher_NotFound() { @@ -236,6 +241,13 @@ func (suite *dataFetcherSuite) TestFetcher_NotFound() { return err }, }, + "service": { + typ: catalog.ServiceType, + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchService(context.Background(), id) + return err + }, + }, } for name, c := range cases { @@ -283,6 +295,13 @@ func (suite *dataFetcherSuite) TestFetcher_FetchErrors() { return err }, }, + "service": { + name: "web-service", + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchService(context.Background(), id) + return err + }, + }, } for name, c := range cases { @@ -313,7 +332,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchErrors() { } } -func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { +func (suite *dataFetcherSuite) TestFetcher_FetchExplicitDestinationsData() { destination1 := intermediate.CombinedDestinationRef{ ServiceRef: resource.Reference(suite.api1Service.Id, ""), Port: "tcp", @@ -339,14 +358,14 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }, } - c := sidecarproxycache.New() + c := sidecarproxycache.NewDestinationsCache() c.WriteDestination(destination1) c.WriteDestination(destination2) c.WriteDestination(destination3) f := Fetcher{ - Cache: c, - Client: suite.client, + DestinationsCache: c, + Client: suite.client, } suite.T().Run("destinations not found", func(t *testing.T) { @@ -361,7 +380,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationRefNoDestinations) destinationRefs := []intermediate.CombinedDestinationRef{destinationRefNoDestinations} - destinations, _, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, _, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) require.Nil(t, destinations) _, foundDest := c.ReadDestination(destinationRefNoDestinations.ServiceRef, destinationRefNoDestinations.Port) @@ -383,7 +402,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationNoServiceEndpoints) destinationRefs := []intermediate.CombinedDestinationRef{destinationNoServiceEndpoints} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) require.Nil(t, destinations) @@ -423,7 +442,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationNonMeshServiceEndpoints) destinationRefs := []intermediate.CombinedDestinationRef{destinationNonMeshServiceEndpoints} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) require.Nil(t, destinations) @@ -458,7 +477,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { destinationRefs := []intermediate.CombinedDestinationRef{destination1} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) serviceRef := resource.ReferenceToString(destination1.ServiceRef) destinationRef := resource.IDToString(destination1.ExplicitDestinationsID) expectedStatus := &intermediate.Status{ @@ -496,7 +515,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }, } - _, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs) + _, statuses, err = f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef]) }) @@ -514,7 +533,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationMeshDestinationPort) destinationRefs := []intermediate.CombinedDestinationRef{destinationMeshDestinationPort} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) serviceRef := resource.ReferenceToString(destination1.ServiceRef) destinationRef := resource.IDToString(destination1.ExplicitDestinationsID) expectedStatus := &intermediate.Status{ @@ -553,7 +572,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }, } - _, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs) + _, statuses, err = f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef]) }) @@ -610,7 +629,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { meshStatus.ConditionNonMeshProtocolDestinationPort(ref, d.Port)) } - actualDestinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + actualDestinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) // Check that all statuses have "happy" conditions. @@ -622,6 +641,152 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }) } +func (suite *dataFetcherSuite) TestFetcher_FetchImplicitDestinationsData() { + existingDestinations := []*intermediate.Destination{ + { + Explicit: suite.webDestinationsData.Upstreams[0], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api1ServiceEndpoints, + Endpoints: suite.api1ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-1-identity", + Tenancy: suite.api1Service.Id.Tenancy, + }, + }, + }, + { + Explicit: suite.webDestinationsData.Upstreams[1], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api2ServiceEndpoints, + Endpoints: suite.api2ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-2-identity", + Tenancy: suite.api2Service.Id.Tenancy, + }, + }, + }, + { + Explicit: suite.webDestinationsData.Upstreams[2], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api2ServiceEndpoints, + Endpoints: suite.api2ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-2-identity", + Tenancy: suite.api2Service.Id.Tenancy, + }, + }, + }, + } + + // Create a few other services to be implicit upstreams. + api3Service := resourcetest.Resource(catalog.ServiceType, "api-3"). + WithData(suite.T(), &pbcatalog.Service{ + VirtualIps: []string{"192.1.1.1"}, + }). + Write(suite.T(), suite.client) + + api3ServiceEndpointsData := &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: &pbresource.ID{ + Name: "api-3-abc", + Tenancy: api3Service.Id.Tenancy, + Type: catalog.WorkloadType, + }, + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "mesh": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + Identity: "api-3-identity", + }, + }, + } + api3ServiceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-3"). + WithData(suite.T(), api3ServiceEndpointsData).Write(suite.T(), suite.client) + + f := Fetcher{ + Client: suite.client, + } + + expDestinations := append(existingDestinations, &intermediate.Destination{ + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api3ServiceEndpoints, + Endpoints: api3ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-3-identity", + Tenancy: api3Service.Id.Tenancy, + }, + }, + VirtualIPs: []string{"192.1.1.1"}, + }) + + actualDestinations, err := f.FetchImplicitDestinationsData(context.Background(), suite.webProxy.Id, existingDestinations) + require.NoError(suite.T(), err) + + prototest.AssertElementsMatch(suite.T(), expDestinations, actualDestinations) +} + +func (suite *dataFetcherSuite) TestFetcher_FetchAndMergeProxyConfigurations() { + // Create some proxy configurations. + proxyCfg1Data := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + }, + } + + proxyCfg2Data := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + MutualTlsMode: pbmesh.MutualTLSMode_MUTUAL_TLS_MODE_DEFAULT, + }, + } + + proxyCfg1 := resourcetest.Resource(types.ProxyConfigurationType, "config-1"). + WithData(suite.T(), proxyCfg1Data). + Write(suite.T(), suite.client) + + proxyCfg2 := resourcetest.Resource(types.ProxyConfigurationType, "config-2"). + WithData(suite.T(), proxyCfg2Data). + Write(suite.T(), suite.client) + + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() + proxyCfgCache.TrackProxyConfiguration(proxyCfg1.Id, []resource.ReferenceOrID{suite.webProxy.Id}) + proxyCfgCache.TrackProxyConfiguration(proxyCfg2.Id, []resource.ReferenceOrID{suite.webProxy.Id}) + + expectedProxyCfg := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + MutualTlsMode: pbmesh.MutualTLSMode_MUTUAL_TLS_MODE_DEFAULT, + }, + } + + fetcher := Fetcher{Client: suite.client, ProxyCfgCache: proxyCfgCache} + + actualProxyCfg, err := fetcher.FetchAndMergeProxyConfigurations(suite.ctx, suite.webProxy.Id) + require.NoError(suite.T(), err) + prototest.AssertDeepEqual(suite.T(), expectedProxyCfg, actualProxyCfg) + + // Delete proxy cfg and check that the cache gets updated. + _, err = suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: proxyCfg1.Id}) + require.NoError(suite.T(), err) + + _, err = fetcher.FetchAndMergeProxyConfigurations(suite.ctx, suite.webProxy.Id) + require.NoError(suite.T(), err) + + proxyCfg2.Id.Uid = "" + prototest.AssertElementsMatch(suite.T(), + []*pbresource.ID{proxyCfg2.Id}, + fetcher.ProxyCfgCache.ProxyConfigurationsByProxyID(suite.webProxy.Id)) +} + func TestDataFetcher(t *testing.T) { suite.Run(t, new(dataFetcherSuite)) } diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go index a0a36a6f48..0d2531a516 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go @@ -3,9 +3,7 @@ package sidecarproxymapper import ( "context" - "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" "github.com/hashicorp/consul/internal/resource" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" @@ -21,38 +19,15 @@ func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt con // Look up workloads for this destinations. sourceProxyIDs := make(map[resource.ReferenceKey]struct{}) - var result []controller.Request - for _, prefix := range destinations.Workloads.Prefixes { - resp, err := rt.Client.List(ctx, &pbresource.ListRequest{ - Type: catalog.WorkloadType, - Tenancy: res.Id.Tenancy, - NamePrefix: prefix, - }) - if err != nil { - return nil, err - } - for _, r := range resp.Resources { - proxyID := resource.ReplaceType(types.ProxyStateTemplateType, r.Id) - sourceProxyIDs[resource.NewReferenceKey(proxyID)] = struct{}{} - result = append(result, controller.Request{ - ID: proxyID, - }) - } + + requests, err := mapSelectorToProxyStateTemplates(ctx, rt.Client, destinations.Workloads, res.Id.Tenancy, func(id *pbresource.ID) { + sourceProxyIDs[resource.NewReferenceKey(id)] = struct{}{} + }) + if err != nil { + return nil, err } - for _, name := range destinations.Workloads.Names { - proxyID := &pbresource.ID{ - Name: name, - Tenancy: res.Id.Tenancy, - Type: types.ProxyStateTemplateType, - } - sourceProxyIDs[resource.NewReferenceKey(proxyID)] = struct{}{} - result = append(result, controller.Request{ - ID: proxyID, - }) - } - - // Add this destination to cache. + // Add this destination to destinationsCache. for _, destination := range destinations.Upstreams { destinationRef := intermediate.CombinedDestinationRef{ ServiceRef: destination.DestinationRef, @@ -60,8 +35,8 @@ func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt con ExplicitDestinationsID: res.Id, SourceProxies: sourceProxyIDs, } - m.cache.WriteDestination(destinationRef) + m.destinationsCache.WriteDestination(destinationRef) } - return result, nil + return requests, nil } diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go index 07a7939404..f53071fe38 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go @@ -64,8 +64,8 @@ func TestMapDestinationsToProxyStateTemplate(t *testing.T) { WithData(t, webDestinationsData). Write(t, client) - c := sidecarproxycache.New() - mapper := &Mapper{cache: c} + c := sidecarproxycache.NewDestinationsCache() + mapper := &Mapper{destinationsCache: c} expRequests := []controller.Request{ {ID: resource.ReplaceType(types.ProxyStateTemplateType, webWorkload1.Id)}, diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/mapper.go new file mode 100644 index 0000000000..448b4a5eb5 --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/mapper.go @@ -0,0 +1,67 @@ +package sidecarproxymapper + +import ( + "context" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +type Mapper struct { + destinationsCache *sidecarproxycache.DestinationsCache + proxyCfgCache *sidecarproxycache.ProxyConfigurationCache +} + +func New(destinationsCache *sidecarproxycache.DestinationsCache, proxyCfgCache *sidecarproxycache.ProxyConfigurationCache) *Mapper { + return &Mapper{ + destinationsCache: destinationsCache, + proxyCfgCache: proxyCfgCache, + } +} + +// mapSelectorToProxyStateTemplates returns ProxyStateTemplate requests given a workload +// selector and tenancy. The cacheFunc can be called if the resulting ids need to be cached. +func mapSelectorToProxyStateTemplates(ctx context.Context, + client pbresource.ResourceServiceClient, + selector *pbcatalog.WorkloadSelector, + tenancy *pbresource.Tenancy, + cacheFunc func(id *pbresource.ID)) ([]controller.Request, error) { + var result []controller.Request + + for _, prefix := range selector.Prefixes { + resp, err := client.List(ctx, &pbresource.ListRequest{ + Type: catalog.WorkloadType, + Tenancy: tenancy, + NamePrefix: prefix, + }) + if err != nil { + return nil, err + } + for _, r := range resp.Resources { + id := resource.ReplaceType(types.ProxyStateTemplateType, r.Id) + result = append(result, controller.Request{ + ID: id, + }) + cacheFunc(id) + } + } + + for _, name := range selector.Names { + id := &pbresource.ID{ + Name: name, + Tenancy: tenancy, + Type: types.ProxyStateTemplateType, + } + result = append(result, controller.Request{ + ID: id, + }) + cacheFunc(id) + } + + return result, nil +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go new file mode 100644 index 0000000000..108b023b33 --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go @@ -0,0 +1,76 @@ +package sidecarproxymapper + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" +) + +func TestMapWorkloadsBySelector(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) + + // Create some workloads. + // For this test, we don't care about the workload data, so we will re-use + // the same data for all workloads. + workloadData := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"p1": {Port: 8080}}, + } + w1 := resourcetest.Resource(catalog.WorkloadType, "w1"). + WithData(t, workloadData). + Write(t, client).Id + w2 := resourcetest.Resource(catalog.WorkloadType, "w2"). + WithData(t, workloadData). + Write(t, client).Id + w3 := resourcetest.Resource(catalog.WorkloadType, "prefix-w3"). + WithData(t, workloadData). + Write(t, client).Id + w4 := resourcetest.Resource(catalog.WorkloadType, "prefix-w4"). + WithData(t, workloadData). + Write(t, client).Id + // This workload should not be used as it's not selected by the workload selector. + resourcetest.Resource(catalog.WorkloadType, "not-selected-workload"). + WithData(t, workloadData). + Write(t, client) + + selector := &pbcatalog.WorkloadSelector{ + Names: []string{"w1", "w2"}, + Prefixes: []string{"prefix"}, + } + expReqs := []controller.Request{ + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w1)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w2)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w3)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w4)}, + } + + var cachedReqs []controller.Request + + reqs, err := mapSelectorToProxyStateTemplates(context.Background(), client, selector, defaultTenancy(), func(id *pbresource.ID) { + // save IDs to check that the cache func is called + cachedReqs = append(cachedReqs, controller.Request{ID: id}) + }) + require.NoError(t, err) + require.Len(t, reqs, len(expReqs)) + prototest.AssertElementsMatch(t, expReqs, reqs) + prototest.AssertElementsMatch(t, expReqs, cachedReqs) +} + +func defaultTenancy() *pbresource.Tenancy { + return &pbresource.Tenancy{ + Namespace: "default", + Partition: "default", + PeerName: "local", + } +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go new file mode 100644 index 0000000000..e83bdedeef --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go @@ -0,0 +1,31 @@ +package sidecarproxymapper + +import ( + "context" + + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func (m *Mapper) MapProxyConfigurationToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { + var proxyConfig pbmesh.ProxyConfiguration + err := res.Data.UnmarshalTo(&proxyConfig) + if err != nil { + return nil, err + } + + var proxyIDs []resource.ReferenceOrID + + requests, err := mapSelectorToProxyStateTemplates(ctx, rt.Client, proxyConfig.Workloads, res.Id.Tenancy, func(id *pbresource.ID) { + proxyIDs = append(proxyIDs, id) + }) + if err != nil { + return nil, err + } + + m.proxyCfgCache.TrackProxyConfiguration(res.Id, proxyIDs) + + return requests, nil +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go new file mode 100644 index 0000000000..b1e355c5cb --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go @@ -0,0 +1,73 @@ +package sidecarproxymapper + +import ( + "context" + "testing" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestProxyConfigurationMapper(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) + + // Create some workloads. + // For this test, we don't care about the workload data, so we will re-use + // the same data for all workloads. + workloadData := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"p1": {Port: 8080}}, + } + w1 := resourcetest.Resource(catalog.WorkloadType, "w1"). + WithData(t, workloadData). + Write(t, client).Id + w2 := resourcetest.Resource(catalog.WorkloadType, "w2"). + WithData(t, workloadData). + Write(t, client).Id + + // Create proxy configuration. + proxyCfgData := &pbmesh.ProxyConfiguration{ + Workloads: &pbcatalog.WorkloadSelector{ + Names: []string{"w1", "w2"}, + }, + } + pCfg := resourcetest.Resource(types.ProxyConfigurationType, "proxy-config"). + WithData(t, proxyCfgData). + Write(t, client) + + m := Mapper{proxyCfgCache: sidecarproxycache.NewProxyConfigurationCache()} + reqs, err := m.MapProxyConfigurationToProxyStateTemplate(context.Background(), controller.Runtime{ + Client: client, + }, pCfg) + require.NoError(t, err) + + p1 := resource.ReplaceType(types.ProxyStateTemplateType, w1) + p2 := resource.ReplaceType(types.ProxyStateTemplateType, w2) + expReqs := []controller.Request{ + {ID: p1}, + {ID: p2}, + } + prototest.AssertElementsMatch(t, expReqs, reqs) + + // Check that the cache is populated. + + // Clean out UID as we don't care about it in the cache. + pCfg.Id.Uid = "" + prototest.AssertElementsMatch(t, + []*pbresource.ID{pCfg.Id}, + m.proxyCfgCache.ProxyConfigurationsByProxyID(p1)) + + prototest.AssertElementsMatch(t, + []*pbresource.ID{pCfg.Id}, + m.proxyCfgCache.ProxyConfigurationsByProxyID(p2)) +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go index 3ba78dbd38..9d91edb31c 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go @@ -5,26 +5,16 @@ import ( "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" ) -type Mapper struct { - cache *sidecarproxycache.Cache -} - -func New(c *sidecarproxycache.Cache) *Mapper { - return &Mapper{ - cache: c, - } -} - // MapServiceEndpointsToProxyStateTemplate maps catalog.ServiceEndpoints objects to the IDs of // ProxyStateTemplate. -func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { +func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { // This mapper needs to look up workload IDs from service endpoints and replace them with ProxyStateTemplate type. var serviceEndpoints pbcatalog.ServiceEndpoints err := res.Data.UnmarshalTo(&serviceEndpoints) @@ -68,7 +58,7 @@ func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ co continue } serviceRef := resource.Reference(serviceID, "") - if destination, ok := m.cache.ReadDestination(serviceRef, portName); ok { + if destination, ok := m.destinationsCache.ReadDestination(serviceRef, portName); ok { for refKey := range destination.SourceProxies { result = append(result, controller.Request{ID: refKey.ToID()}) } @@ -76,5 +66,24 @@ func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ co } } + // todo (ishustava): this is a stub for now until we implement implicit destinations. + // For tproxy, we generate requests for all proxy states in the cluster. + // This will generate duplicate events for proxies already added above, + // however, we expect that the controller runtime will de-dup for us. + rsp, err := rt.Client.List(ctx, &pbresource.ListRequest{ + Type: types.ProxyStateTemplateType, + Tenancy: &pbresource.Tenancy{ + Namespace: storage.Wildcard, + Partition: res.Id.Tenancy.Partition, + PeerName: res.Id.Tenancy.PeerName, + }, + }) + if err != nil { + return nil, err + } + for _, r := range rsp.Resources { + result = append(result, controller.Request{ID: r.Id}) + } + return result, err } diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go index 6c17d6b43a..ab9113f750 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" @@ -18,6 +19,7 @@ import ( ) func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) workload1 := resourcetest.Resource(catalog.WorkloadType, "workload-1"). WithTenancy(resource.DefaultNamespacedTenancy()).Build() workload2 := resourcetest.Resource(catalog.WorkloadType, "workload-2"). @@ -50,8 +52,8 @@ func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { proxyTmpl2ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-2"). WithTenancy(resource.DefaultNamespacedTenancy()).ID() - c := sidecarproxycache.New() - mapper := &Mapper{cache: c} + c := sidecarproxycache.NewDestinationsCache() + mapper := &Mapper{destinationsCache: c} sourceProxy1 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-3"). WithTenancy(resource.DefaultNamespacedTenancy()).ID() sourceProxy2 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-4"). @@ -95,7 +97,7 @@ func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { {ID: sourceProxy3}, } - requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{}, serviceEndpoints) + requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{Client: client}, serviceEndpoints) require.NoError(t, err) prototest.AssertElementsMatch(t, expRequests, requests) } diff --git a/internal/mesh/internal/types/intermediate/types.go b/internal/mesh/internal/types/intermediate/types.go index 93ae9df4e9..24ac20ca93 100644 --- a/internal/mesh/internal/types/intermediate/types.go +++ b/internal/mesh/internal/types/intermediate/types.go @@ -29,6 +29,11 @@ type ServiceEndpoints struct { Endpoints *pbcatalog.ServiceEndpoints } +type Service struct { + Resource *pbresource.Resource + Service *pbcatalog.Service +} + type Destinations struct { Resource *pbresource.Resource Destinations *pbmesh.Upstreams @@ -53,6 +58,7 @@ type Destination struct { Explicit *pbmesh.Upstream ServiceEndpoints *ServiceEndpoints Identities []*pbresource.Reference + VirtualIPs []string } type Status struct { diff --git a/proto-public/pbmesh/v1alpha1/proxy_configuration_addon.go b/proto-public/pbmesh/v1alpha1/proxy_configuration_addon.go new file mode 100644 index 0000000000..73dad9db15 --- /dev/null +++ b/proto-public/pbmesh/v1alpha1/proxy_configuration_addon.go @@ -0,0 +1,6 @@ +package meshv1alpha1 + +func (p *ProxyConfiguration) IsTransparentProxy() bool { + return p.GetDynamicConfig() != nil && + p.DynamicConfig.Mode == ProxyMode_PROXY_MODE_TRANSPARENT +} diff --git a/proto-public/pbmesh/v1alpha1/proxy_configuration_addon_test.go b/proto-public/pbmesh/v1alpha1/proxy_configuration_addon_test.go new file mode 100644 index 0000000000..c2b9224fa9 --- /dev/null +++ b/proto-public/pbmesh/v1alpha1/proxy_configuration_addon_test.go @@ -0,0 +1,49 @@ +package meshv1alpha1 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsTransprentProxy(t *testing.T) { + cases := map[string]struct { + proxyCfg *ProxyConfiguration + exp bool + }{ + "nil dynamic config": { + proxyCfg: &ProxyConfiguration{}, + exp: false, + }, + "default mode": { + proxyCfg: &ProxyConfiguration{ + DynamicConfig: &DynamicConfig{ + Mode: ProxyMode_PROXY_MODE_DEFAULT, + }, + }, + exp: false, + }, + "direct mode": { + proxyCfg: &ProxyConfiguration{ + DynamicConfig: &DynamicConfig{ + Mode: ProxyMode_PROXY_MODE_DEFAULT, + }, + }, + exp: false, + }, + "transparent mode": { + proxyCfg: &ProxyConfiguration{ + DynamicConfig: &DynamicConfig{ + Mode: ProxyMode_PROXY_MODE_TRANSPARENT, + }, + }, + exp: true, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require.Equal(t, c.exp, c.proxyCfg.IsTransparentProxy()) + }) + } +}