sidecar-proxy controller: Add support for transparent proxy (NET-5069) (#18458)

This commit adds support for transparent proxy to the sidecar proxy controller. As we do not yet support inferring destinations from intentions, this assumes that all services in the cluster are destinations.
This commit is contained in:
Iryna Shustava 2023-09-08 16:18:01 -06:00 committed by GitHub
parent ed79c60e78
commit 1557e1d6a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2309 additions and 225 deletions

View File

@ -919,6 +919,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) {
return s.getTrustDomain(caConfig)
},
LocalDatacenter: s.config.Datacenter,
})
}

View File

@ -10,13 +10,13 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Cache stores information needed for the sidecar-proxy controller to reconcile efficiently.
// DestinationsCache stores information needed for the sidecar-proxy controller to reconcile efficiently.
// This currently means storing a list of all destinations for easy look up
// as well as indices of source proxies where those destinations are referenced.
//
// It is the responsibility of the controller and its subcomponents (like mapper and data fetcher)
// to keep this cache up-to-date as we're observing new data.
type Cache struct {
type DestinationsCache struct {
lock sync.RWMutex
// store is a map from destination service reference and port as a reference key
@ -30,8 +30,8 @@ type Cache struct {
type storeKeys map[ReferenceKeyWithPort]struct{}
func New() *Cache {
return &Cache{
func NewDestinationsCache() *DestinationsCache {
return &DestinationsCache{
store: make(map[ReferenceKeyWithPort]intermediate.CombinedDestinationRef),
sourceProxiesIndex: make(map[resource.ReferenceKey]storeKeys),
}
@ -48,7 +48,7 @@ func KeyFromRefAndPort(ref *pbresource.Reference, port string) ReferenceKeyWithP
}
// WriteDestination adds destination reference to the cache.
func (c *Cache) WriteDestination(d intermediate.CombinedDestinationRef) {
func (c *DestinationsCache) WriteDestination(d intermediate.CombinedDestinationRef) {
// Check that reference is a catalog.Service type.
if !resource.EqualType(catalog.ServiceType, d.ServiceRef.Type) {
panic("ref must of type catalog.Service")
@ -68,7 +68,7 @@ func (c *Cache) WriteDestination(d intermediate.CombinedDestinationRef) {
}
// DeleteDestination deletes a given destination reference and port from cache.
func (c *Cache) DeleteDestination(ref *pbresource.Reference, port string) {
func (c *DestinationsCache) DeleteDestination(ref *pbresource.Reference, port string) {
// Check that reference is a catalog.Service type.
if !resource.EqualType(catalog.ServiceType, ref.Type) {
panic("ref must of type catalog.Service")
@ -80,7 +80,7 @@ func (c *Cache) DeleteDestination(ref *pbresource.Reference, port string) {
c.deleteLocked(ref, port)
}
func (c *Cache) addLocked(d intermediate.CombinedDestinationRef) {
func (c *DestinationsCache) addLocked(d intermediate.CombinedDestinationRef) {
key := KeyFromRefAndPort(d.ServiceRef, d.Port)
c.store[key] = d
@ -96,7 +96,7 @@ func (c *Cache) addLocked(d intermediate.CombinedDestinationRef) {
}
}
func (c *Cache) deleteLocked(ref *pbresource.Reference, port string) {
func (c *DestinationsCache) deleteLocked(ref *pbresource.Reference, port string) {
key := KeyFromRefAndPort(ref, port)
// First get it from the store.
@ -117,7 +117,7 @@ func (c *Cache) deleteLocked(ref *pbresource.Reference, port string) {
}
// DeleteSourceProxy deletes the source proxy given by id from the cache.
func (c *Cache) DeleteSourceProxy(id *pbresource.ID) {
func (c *DestinationsCache) DeleteSourceProxy(id *pbresource.ID) {
// Check that id is the ProxyStateTemplate type.
if !resource.EqualType(types.ProxyStateTemplateType, id.Type) {
panic("id must of type mesh.ProxyStateTemplate")
@ -148,7 +148,7 @@ func (c *Cache) DeleteSourceProxy(id *pbresource.ID) {
}
// ReadDestination returns a destination reference for the given service reference and port.
func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) (intermediate.CombinedDestinationRef, bool) {
func (c *DestinationsCache) ReadDestination(ref *pbresource.Reference, port string) (intermediate.CombinedDestinationRef, bool) {
// Check that reference is a catalog.Service type.
if !resource.EqualType(catalog.ServiceType, ref.Type) {
panic("ref must of type catalog.Service")
@ -164,7 +164,7 @@ func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) (interme
}
// DestinationsBySourceProxy returns all destinations that are a referenced by the given source proxy id.
func (c *Cache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef {
func (c *DestinationsCache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef {
// Check that id is the ProxyStateTemplate type.
if !resource.EqualType(types.ProxyStateTemplateType, id.Type) {
panic("id must of type mesh.ProxyStateTemplate")

View File

@ -3,19 +3,21 @@ package sidecarproxycache
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/mesh/internal/types/intermediate"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/stretchr/testify/require"
)
func TestWrite_Create(t *testing.T) {
cache := New()
cache := NewDestinationsCache()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
destination := testDestination(proxyID)
cache.WriteDestination(destination)
@ -34,9 +36,10 @@ func TestWrite_Create(t *testing.T) {
}
func TestWrite_Update(t *testing.T) {
cache := New()
cache := NewDestinationsCache()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
destination1 := testDestination(proxyID)
cache.WriteDestination(destination1)
@ -58,7 +61,8 @@ func TestWrite_Update(t *testing.T) {
// Add another destination for a different proxy.
anotherProxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-def").ID()
destination3 := testDestination(anotherProxyID)
destination3.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-3").ReferenceNoSection()
destination3.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-3").
WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection()
cache.WriteDestination(destination3)
actualSourceProxies = cache.sourceProxiesIndex
@ -91,15 +95,17 @@ func TestWrite_Update(t *testing.T) {
}
func TestWrite_Delete(t *testing.T) {
cache := New()
cache := NewDestinationsCache()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
destination1 := testDestination(proxyID)
cache.WriteDestination(destination1)
// Add another destination for the same proxy ID.
destination2 := testDestination(proxyID)
destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection()
destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").
WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection()
cache.WriteDestination(destination2)
cache.DeleteDestination(destination1.ServiceRef, destination1.Port)
@ -117,7 +123,8 @@ func TestWrite_Delete(t *testing.T) {
// Try to delete non-existing destination and check that nothing has changed..
cache.DeleteDestination(
resourcetest.Resource(catalog.ServiceType, "does-not-exist").ReferenceNoSection(),
resourcetest.Resource(catalog.ServiceType, "does-not-exist").
WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection(),
"doesn't-matter")
require.Contains(t, cache.store, KeyFromRefAndPort(destination2.ServiceRef, destination2.Port))
@ -125,15 +132,17 @@ func TestWrite_Delete(t *testing.T) {
}
func TestDeleteSourceProxy(t *testing.T) {
cache := New()
cache := NewDestinationsCache()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
destination1 := testDestination(proxyID)
cache.WriteDestination(destination1)
// Add another destination for the same proxy ID.
destination2 := testDestination(proxyID)
destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection()
destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").
WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection()
cache.WriteDestination(destination2)
cache.DeleteSourceProxy(proxyID)
@ -160,15 +169,17 @@ func TestDeleteSourceProxy(t *testing.T) {
}
func TestDestinationsBySourceProxy(t *testing.T) {
cache := New()
cache := NewDestinationsCache()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID()
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
destination1 := testDestination(proxyID)
cache.WriteDestination(destination1)
// Add another destination for the same proxy ID.
destination2 := testDestination(proxyID)
destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection()
destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").
WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection()
cache.WriteDestination(destination2)
actualDestinations := cache.DestinationsBySourceProxy(proxyID)
@ -178,9 +189,11 @@ func TestDestinationsBySourceProxy(t *testing.T) {
func testDestination(proxyID *pbresource.ID) intermediate.CombinedDestinationRef {
return intermediate.CombinedDestinationRef{
ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service").ReferenceNoSection(),
Port: "tcp",
ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations").ID(),
ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service").
WithTenancy(resource.DefaultNamespacedTenancy()).ReferenceNoSection(),
Port: "tcp",
ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations").
WithTenancy(resource.DefaultNamespacedTenancy()).ID(),
SourceProxies: map[resource.ReferenceKey]struct{}{
resource.NewReferenceKey(proxyID): {},
},

View File

@ -0,0 +1,41 @@
package sidecarproxycache
import (
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// ProxyConfigurationCache tracks mappings between proxy configurations and proxy IDs
// that a configuration applies to. It is the responsibility of the controller to
// keep this cache up-to-date.
type ProxyConfigurationCache struct {
mapper *bimapper.Mapper
}
func NewProxyConfigurationCache() *ProxyConfigurationCache {
return &ProxyConfigurationCache{
mapper: bimapper.New(types.ProxyConfigurationType, types.ProxyStateTemplateType),
}
}
// ProxyConfigurationsByProxyID returns proxy configuration IDs given the id of the proxy state template.
func (c *ProxyConfigurationCache) ProxyConfigurationsByProxyID(id *pbresource.ID) []*pbresource.ID {
return c.mapper.ItemIDsForLink(id)
}
// TrackProxyConfiguration tracks given proxy configuration ID and the linked proxy state template IDs.
func (c *ProxyConfigurationCache) TrackProxyConfiguration(proxyCfgID *pbresource.ID, proxyIDs []resource.ReferenceOrID) {
c.mapper.TrackItem(proxyCfgID, proxyIDs)
}
// UntrackProxyConfiguration removes tracking for the given proxy configuration ID.
func (c *ProxyConfigurationCache) UntrackProxyConfiguration(proxyCfgID *pbresource.ID) {
c.mapper.UntrackItem(proxyCfgID)
}
// UntrackProxyID removes tracking for the given proxy state template ID.
func (c *ProxyConfigurationCache) UntrackProxyID(proxyID *pbresource.ID) {
c.mapper.UntrackLink(proxyID)
}

View File

@ -0,0 +1,77 @@
package sidecarproxycache
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
)
func TestProxyConfigurationCache(t *testing.T) {
cache := NewProxyConfigurationCache()
// Create some proxy configurations.
proxyCfg1 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-1").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
proxyCfg2 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-2").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
proxyCfg3 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-3").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
// Create some proxy state templates.
p1 := resourcetest.Resource(types.ProxyStateTemplateType, "w-111").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
p2 := resourcetest.Resource(types.ProxyStateTemplateType, "w-222").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
p3 := resourcetest.Resource(types.ProxyStateTemplateType, "w-333").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
p4 := resourcetest.Resource(types.ProxyStateTemplateType, "w-444").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
p5 := resourcetest.Resource(types.ProxyStateTemplateType, "w-555").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
// Track these and make sure there's some overlap.
cache.TrackProxyConfiguration(proxyCfg1, []resource.ReferenceOrID{p1, p2, p4})
cache.TrackProxyConfiguration(proxyCfg2, []resource.ReferenceOrID{p3, p4, p5})
cache.TrackProxyConfiguration(proxyCfg3, []resource.ReferenceOrID{p1, p3})
// Read proxy configurations by proxy.
requireProxyConfigurations(t, cache, p1, proxyCfg1, proxyCfg3)
requireProxyConfigurations(t, cache, p2, proxyCfg1)
requireProxyConfigurations(t, cache, p3, proxyCfg2, proxyCfg3)
requireProxyConfigurations(t, cache, p4, proxyCfg1, proxyCfg2)
requireProxyConfigurations(t, cache, p5, proxyCfg2)
// Untrack some proxy IDs.
cache.UntrackProxyID(p1)
requireProxyConfigurations(t, cache, p1)
// Untrack some proxy IDs.
cache.UntrackProxyID(p3)
requireProxyConfigurations(t, cache, p3)
// Untrack proxy cfg.
cache.UntrackProxyConfiguration(proxyCfg1)
requireProxyConfigurations(t, cache, p1) // no-op because we untracked it earlier
requireProxyConfigurations(t, cache, p2)
requireProxyConfigurations(t, cache, p3) // no-op because we untracked it earlier
requireProxyConfigurations(t, cache, p4, proxyCfg2)
requireProxyConfigurations(t, cache, p5, proxyCfg2)
}
func requireProxyConfigurations(t *testing.T, cache *ProxyConfigurationCache, proxyID *pbresource.ID, proxyCfgs ...*pbresource.ID) {
t.Helper()
actualProxyCfgs := cache.ProxyConfigurationsByProxyID(proxyID)
require.Len(t, actualProxyCfgs, len(proxyCfgs))
prototest.AssertElementsMatch(t, proxyCfgs, actualProxyCfgs)
}

View File

@ -16,14 +16,19 @@ import (
type Dependencies struct {
TrustDomainFetcher sidecarproxy.TrustDomainFetcher
LocalDatacenter string
TrustBundleFetcher xds.TrustBundleFetcher
ProxyUpdater xds.ProxyUpdater
}
func Register(mgr *controller.Manager, deps Dependencies) {
c := sidecarproxycache.New()
m := sidecarproxymapper.New(c)
mapper := bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType)
mgr.Register(xds.Controller(mapper, deps.ProxyUpdater, deps.TrustBundleFetcher))
mgr.Register(sidecarproxy.Controller(c, m, deps.TrustDomainFetcher))
destinationsCache := sidecarproxycache.NewDestinationsCache()
proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache()
m := sidecarproxymapper.New(destinationsCache, proxyCfgCache)
mgr.Register(
sidecarproxy.Controller(destinationsCache, proxyCfgCache, m, deps.TrustDomainFetcher, deps.LocalDatacenter),
)
}

View File

@ -10,13 +10,25 @@ import (
type Builder struct {
id *pbresource.ID
proxyStateTemplate *pbmesh.ProxyStateTemplate
proxyCfg *pbmesh.ProxyConfiguration
trustDomain string
localDatacenter string
outboundListenerBuilder *ListenerBuilder
}
func New(id *pbresource.ID, identity *pbresource.Reference, trustDomain string) *Builder {
func New(id *pbresource.ID,
identity *pbresource.Reference,
trustDomain string,
dc string,
proxyCfg *pbmesh.ProxyConfiguration,
) *Builder {
return &Builder{
id: id,
trustDomain: trustDomain,
id: id,
trustDomain: trustDomain,
localDatacenter: dc,
proxyCfg: proxyCfg,
proxyStateTemplate: &pbmesh.ProxyStateTemplate{
ProxyState: &pbmesh.ProxyState{
Identity: identity,

View File

@ -4,8 +4,12 @@
package builder
import (
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/mesh/internal/types/intermediate"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
@ -13,9 +17,17 @@ import (
)
func (b *Builder) BuildDestinations(destinations []*intermediate.Destination) *Builder {
if b.proxyCfg.GetDynamicConfig() != nil &&
b.proxyCfg.DynamicConfig.Mode == pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT {
b.addOutboundListener(b.proxyCfg.DynamicConfig.TransparentProxy.OutboundListenerPort)
}
for _, destination := range destinations {
if destination.Explicit != nil {
b.buildExplicitDestination(destination)
} else {
b.buildImplicitDestination(destination)
}
}
@ -47,6 +59,29 @@ func (b *Builder) buildExplicitDestination(destination *intermediate.Destination
}
}
func (b *Builder) buildImplicitDestination(destination *intermediate.Destination) {
serviceRef := resource.Reference(destination.ServiceEndpoints.Resource.Owner, "")
clusterName := DestinationClusterName(serviceRef, b.localDatacenter, b.trustDomain)
statPrefix := DestinationStatPrefix(serviceRef, b.localDatacenter)
// We assume that all endpoints have the same port protocol and name, and so it's sufficient
// to check ports just from the first endpoint.
if len(destination.ServiceEndpoints.Endpoints.Endpoints) > 0 {
// Find the destination proxy's port.
// Endpoints refs will need to route to mesh port instead of the destination port as that
// is the port of the destination's proxy.
meshPortName := findMeshPort(destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports)
for _, port := range destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports {
b.outboundListenerBuilder.
addRouterWithIPMatch(clusterName, statPrefix, port.Protocol, destination.VirtualIPs).
buildListener().
addCluster(clusterName, destination.Identities).
addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, meshPortName)
}
}
}
func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *ListenerBuilder {
listener := &pbproxystate.Listener{
Direction: pbproxystate.Direction_DIRECTION_OUTBOUND,
@ -77,18 +112,55 @@ func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Lis
return b.NewListenerBuilder(listener)
}
func (b *Builder) addOutboundListener(port uint32) *ListenerBuilder {
listener := &pbproxystate.Listener{
Name: xdscommon.OutboundListenerName,
Direction: pbproxystate.Direction_DIRECTION_OUTBOUND,
BindAddress: &pbproxystate.Listener_HostPort{
HostPort: &pbproxystate.HostPortAddress{
Host: "127.0.0.1",
Port: port,
},
},
Capabilities: []pbproxystate.Capability{pbproxystate.Capability_CAPABILITY_TRANSPARENT},
}
lb := b.NewListenerBuilder(listener)
// Save outbound listener builder so we can use it in the future.
b.outboundListenerBuilder = lb
return lb
}
func (l *ListenerBuilder) addRouter(clusterName, statPrefix string, protocol pbcatalog.Protocol) *ListenerBuilder {
return l.addRouterWithIPMatch(clusterName, statPrefix, protocol, nil)
}
func (l *ListenerBuilder) addRouterWithIPMatch(clusterName, statPrefix string, protocol pbcatalog.Protocol, vips []string) *ListenerBuilder {
// For explicit destinations, we have no filter chain match, and filters are based on port protocol.
router := &pbproxystate.Router{}
switch protocol {
case pbcatalog.Protocol_PROTOCOL_TCP:
router := &pbproxystate.Router{
Destination: &pbproxystate.Router_L4{
L4: &pbproxystate.L4Destination{
Name: clusterName,
StatPrefix: statPrefix,
},
router.Destination = &pbproxystate.Router_L4{
L4: &pbproxystate.L4Destination{
Name: clusterName,
StatPrefix: statPrefix,
},
}
}
if router.Destination != nil {
for _, vip := range vips {
if router.Match == nil {
router.Match = &pbproxystate.Match{}
}
router.Match.PrefixRanges = append(router.Match.PrefixRanges, &pbproxystate.CidrRange{
AddressPrefix: vip,
PrefixLen: &wrapperspb.UInt32Value{Value: 32},
})
}
l.listener.Routers = append(l.listener.Routers, router)
}
return l
@ -100,7 +172,7 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso
spiffeIDs = append(spiffeIDs, connect.SpiffeIDFromIdentityRef(b.trustDomain, identity))
}
// Create destination cluster
// Create destination cluster.
cluster := &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
@ -130,12 +202,11 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso
return b
}
func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) *Builder {
func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) {
b.proxyStateTemplate.RequiredEndpoints[clusterName] = &pbproxystate.EndpointRef{
Id: serviceEndpointsID,
Port: destinationPort,
}
return b
}
func findMeshPort(ports map[string]*pbcatalog.WorkloadPort) string {

View File

@ -96,7 +96,100 @@ func TestBuildExplicitDestinations(t *testing.T) {
}
for name, c := range cases {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul").
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", nil).
BuildDestinations(c.destinations).
Build()
actual := protoToJSON(t, proxyTmpl)
expected := golden.Get(t, actual, name)
require.JSONEq(t, expected, actual)
}
}
func TestBuildImplicitDestinations(t *testing.T) {
api1Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-1").
WithOwner(
resourcetest.Resource(catalog.ServiceType, "api-1").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()).
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(t, endpointsData).Build()
api2Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-2").
WithOwner(resourcetest.Resource(catalog.ServiceType, "api-2").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()).
WithTenancy(resource.DefaultNamespacedTenancy()).
WithData(t, endpointsData).Build()
api1Identity := &pbresource.Reference{
Name: "api1-identity",
Tenancy: api1Endpoints.Id.Tenancy,
}
api2Identity := &pbresource.Reference{
Name: "api2-identity",
Tenancy: api2Endpoints.Id.Tenancy,
}
proxyCfg := &pbmesh.ProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,
TransparentProxy: &pbmesh.TransparentProxy{
OutboundListenerPort: 15001,
},
},
}
destination1 := &intermediate.Destination{
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: api1Endpoints,
Endpoints: endpointsData,
},
Identities: []*pbresource.Reference{api1Identity},
VirtualIPs: []string{"1.1.1.1"},
}
destination2 := &intermediate.Destination{
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: api2Endpoints,
Endpoints: endpointsData,
},
Identities: []*pbresource.Reference{api2Identity},
VirtualIPs: []string{"2.2.2.2", "3.3.3.3"},
}
destination3 := &intermediate.Destination{
Explicit: &pbmesh.Upstream{
DestinationRef: resource.Reference(api1Endpoints.Id, ""),
DestinationPort: "tcp",
Datacenter: "dc1",
ListenAddr: &pbmesh.Upstream_IpPort{
IpPort: &pbmesh.IPPortAddress{Ip: "1.1.1.1", Port: 1234},
},
},
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: api1Endpoints,
Endpoints: endpointsData,
},
Identities: []*pbresource.Reference{api1Identity},
}
cases := map[string]struct {
destinations []*intermediate.Destination
}{
"l4-single-implicit-destination-tproxy": {
destinations: []*intermediate.Destination{destination1},
},
"l4-multiple-implicit-destinations-tproxy": {
destinations: []*intermediate.Destination{destination1, destination2},
},
"l4-implicit-and-explicit-destinations-tproxy": {
destinations: []*intermediate.Destination{destination2, destination3},
},
}
for name, c := range cases {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", proxyCfg).
BuildDestinations(c.destinations).
Build()

View File

@ -68,7 +68,8 @@ func TestBuildLocalApp(t *testing.T) {
for name, c := range cases {
t.Run(name, func(t *testing.T) {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul").BuildLocalApp(c.workload).
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", nil).
BuildLocalApp(c.workload).
Build()
actual := protoToJSON(t, proxyTmpl)
expected := golden.Get(t, actual, name+".golden")

View File

@ -0,0 +1,199 @@
{
"proxyState": {
"clusters": {
"api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-1.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api1-identity"
]
}
}
}
}
}
},
"api-2.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-2.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api2-identity"
]
}
}
}
}
}
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
}
},
"listeners": [
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "1.1.1.1",
"port": 1234
},
"name": "api-1:tcp:1.1.1.1:1234",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
}
}
]
}
]
},
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
},
"api-2.default.dc1.internal.foo.consul": {
"id": {
"name": "api-2",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
}
}
}

View File

@ -0,0 +1,122 @@
{
"proxyState": {
"clusters": {
"api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-1.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api1-identity"
]
}
}
}
}
}
},
"api-2.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-2.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api2-identity"
]
}
}
}
}
}
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
}
},
"listeners": [
{
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "1.1.1.1",
"port": 1234
},
"name": "api-1:tcp:1.1.1.1:1234",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
}
}
]
},
{
"direction": "DIRECTION_OUTBOUND",
"name": "api-2:tcp:/path/to/socket",
"routers": [
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
}
}
],
"unixSocket": {
"mode": "0666",
"path": "/path/to/socket"
}
}
]
},
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
},
"api-2.default.dc1.internal.foo.consul": {
"id": {
"name": "api-2",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
}
}
}

View File

@ -0,0 +1,360 @@
{
"proxyState": {
"clusters": {
"api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-1.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api1-identity"
]
}
}
}
}
}
},
"api-2.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-2.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api2-identity"
]
}
}
}
}
}
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
}
},
"listeners": [
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
},
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
},
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
},
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
},
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
},
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
},
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "2.2.2.2",
"prefixLen": 32
},
{
"addressPrefix": "3.3.3.3",
"prefixLen": 32
}
]
}
}
]
}
]
},
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
},
"api-2.default.dc1.internal.foo.consul": {
"id": {
"name": "api-2",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
}
}
}

View File

@ -0,0 +1,70 @@
{
"proxyState": {
"clusters": {
"api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-1.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api1-identity"
]
}
}
}
}
}
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
}
},
"listeners": [
{
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "1.1.1.1",
"port": 1234
},
"name": "api-1:tcp:1.1.1.1:1234",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
}
}
]
}
]
},
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
}
}
}

View File

@ -0,0 +1,70 @@
{
"proxyState": {
"clusters": {
"api-2.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-2.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api2-identity"
]
}
}
}
}
}
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
}
},
"listeners": [
{
"direction": "DIRECTION_OUTBOUND",
"name": "api-2:tcp:/path/to/socket",
"routers": [
{
"l4": {
"name": "api-2.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-2.default.default.dc1"
}
}
],
"unixSocket": {
"mode": "0666",
"path": "/path/to/socket"
}
}
]
},
"requiredEndpoints": {
"api-2.default.dc1.internal.foo.consul": {
"id": {
"name": "api-2",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
}
}
}

View File

@ -0,0 +1,135 @@
{
"proxyState": {
"clusters": {
"api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"sni": "api-1.default.dc1.internal.foo.consul",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api1-identity"
]
}
}
}
}
}
}
},
"identity": {
"name": "test-identity",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
}
},
"listeners": [
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
}
]
},
{
"capabilities": [
"CAPABILITY_TRANSPARENT"
],
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"name": "outbound_listener",
"routers": [
{
"l4": {
"name": "api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.api-1.default.default.dc1"
},
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
]
}
}
]
}
]
},
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"tenancy": {
"namespace": "default",
"partition": "default",
"peerName": "local"
},
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
}
},
"port": "mesh"
}
}
}

View File

@ -26,20 +26,33 @@ const ControllerName = "consul.io/sidecar-proxy-controller"
type TrustDomainFetcher func() (string, error)
func Controller(cache *sidecarproxycache.Cache, mapper *sidecarproxymapper.Mapper, trustDomainFetcher TrustDomainFetcher) controller.Controller {
if cache == nil || mapper == nil || trustDomainFetcher == nil {
panic("cache, mapper and trust domain fetcher are required")
func Controller(destinationsCache *sidecarproxycache.DestinationsCache,
proxyCfgCache *sidecarproxycache.ProxyConfigurationCache,
mapper *sidecarproxymapper.Mapper,
trustDomainFetcher TrustDomainFetcher,
dc string) controller.Controller {
if destinationsCache == nil || proxyCfgCache == nil || mapper == nil || trustDomainFetcher == nil {
panic("destinations cache, proxy configuration cache, mapper and trust domain fetcher are required")
}
return controller.ForType(types.ProxyStateTemplateType).
WithWatch(catalog.ServiceEndpointsType, mapper.MapServiceEndpointsToProxyStateTemplate).
WithWatch(types.UpstreamsType, mapper.MapDestinationsToProxyStateTemplate).
WithReconciler(&reconciler{cache: cache, getTrustDomain: trustDomainFetcher})
WithWatch(types.ProxyConfigurationType, mapper.MapProxyConfigurationToProxyStateTemplate).
WithReconciler(&reconciler{
destinationsCache: destinationsCache,
proxyCfgCache: proxyCfgCache,
getTrustDomain: trustDomainFetcher,
dc: dc,
})
}
type reconciler struct {
cache *sidecarproxycache.Cache
getTrustDomain TrustDomainFetcher
destinationsCache *sidecarproxycache.DestinationsCache
proxyCfgCache *sidecarproxycache.ProxyConfigurationCache
getTrustDomain TrustDomainFetcher
dc string
}
func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
@ -48,7 +61,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
rt.Logger.Trace("reconciling proxy state template")
// Instantiate a data fetcher to fetch all reconciliation data.
dataFetcher := fetcher.Fetcher{Client: rt.Client, Cache: r.cache}
dataFetcher := fetcher.New(rt.Client, r.destinationsCache, r.proxyCfgCache)
// Check if the workload exists.
workloadID := resource.ReplaceType(catalog.WorkloadType, req.ID)
@ -87,8 +100,8 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
return err
}
// Remove it from cache.
r.cache.DeleteSourceProxy(req.ID)
// Remove it from destinationsCache.
r.destinationsCache.DeleteSourceProxy(req.ID)
}
rt.Logger.Trace("skipping proxy state template generation because workload is not on the mesh", "workload", workload.Resource.Id)
return nil
@ -101,22 +114,39 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
return err
}
b := builder.New(req.ID, workloadIdentityRefFromWorkload(workload), trustDomain).
// Fetch proxy configuration.
proxyCfg, err := dataFetcher.FetchAndMergeProxyConfigurations(ctx, req.ID)
if err != nil {
rt.Logger.Error("error fetching proxy and merging proxy configurations", "error", err)
return err
}
b := builder.New(req.ID, identityRefFromWorkload(workload), trustDomain, r.dc, proxyCfg).
BuildLocalApp(workload.Workload)
// Get all destinationsData.
destinationsRefs := r.cache.DestinationsBySourceProxy(req.ID)
destinationsData, statuses, err := dataFetcher.FetchDestinationsData(ctx, destinationsRefs)
destinationsRefs := r.destinationsCache.DestinationsBySourceProxy(req.ID)
destinationsData, statuses, err := dataFetcher.FetchExplicitDestinationsData(ctx, destinationsRefs)
if err != nil {
rt.Logger.Error("error fetching destinations for this proxy", "error", err)
rt.Logger.Error("error fetching explicit destinations for this proxy", "error", err)
return err
}
if proxyCfg.IsTransparentProxy() {
destinationsData, err = dataFetcher.FetchImplicitDestinationsData(ctx, req.ID, destinationsData)
if err != nil {
rt.Logger.Error("error fetching implicit destinations for this proxy", "error", err)
return err
}
}
b.BuildDestinations(destinationsData)
newProxyTemplate := b.Build()
if proxyStateTemplate == nil || !proto.Equal(proxyStateTemplate.Tmpl, newProxyTemplate) {
if proxyStateTemplate == nil {
req.ID.Uid = ""
}
proxyTemplateData, err := anypb.New(newProxyTemplate)
if err != nil {
rt.Logger.Error("error creating proxy state template data", "error", err)
@ -161,7 +191,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
return nil
}
func workloadIdentityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference {
func identityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference {
return &pbresource.Reference{
Name: w.Workload.Identity,
Tenancy: w.Resource.Id.Tenancy,

View File

@ -1,13 +1,18 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
// SPDX-License-Identifier: BUSL-1.1
package sidecarproxy
import (
"context"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
@ -24,8 +29,6 @@ import (
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type meshControllerTestSuite struct {
@ -53,7 +56,8 @@ func (suite *meshControllerTestSuite) SetupTest() {
suite.ctx = testutil.TestContext(suite.T())
suite.ctl = &reconciler{
cache: sidecarproxycache.New(),
destinationsCache: sidecarproxycache.NewDestinationsCache(),
proxyCfgCache: sidecarproxycache.NewProxyConfigurationCache(),
getTrustDomain: func() (string, error) {
return "test.consul", nil
},
@ -78,7 +82,8 @@ func (suite *meshControllerTestSuite) SetupTest() {
suite.apiService = resourcetest.Resource(catalog.ServiceType, "api-service").
WithData(suite.T(), &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}},
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}},
VirtualIps: []string{"1.1.1.1"},
Ports: []*pbcatalog.ServicePort{
{TargetPort: "tcp", Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
}}).
@ -140,7 +145,7 @@ func (suite *meshControllerTestSuite) SetupTest() {
Tenancy: suite.apiWorkloadID.Tenancy,
}
suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul").
suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul", "dc1", nil).
BuildLocalApp(suite.apiWorkload).
Build()
}
@ -255,17 +260,19 @@ func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_N
func (suite *meshControllerTestSuite) TestController() {
// This is a comprehensive test that checks the overall controller behavior as various resources change state.
// This should test interactions between the reconciler, the mappers, and the cache to ensure they work
// This should test interactions between the reconciler, the mappers, and the destinationsCache to ensure they work
// together and produce expected result.
// Run the controller manager
mgr := controller.NewManager(suite.client, suite.runtime.Logger)
c := sidecarproxycache.New()
m := sidecarproxymapper.New(c)
mgr.Register(Controller(c, m, func() (string, error) {
return "test.consul", nil
}))
// Initialize controller dependencies.
destinationsCache := sidecarproxycache.NewDestinationsCache()
proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache()
m := sidecarproxymapper.New(destinationsCache, proxyCfgCache)
trustDomainFetcher := func() (string, error) { return "test.consul", nil }
mgr.Register(Controller(destinationsCache, proxyCfgCache, m, trustDomainFetcher, "dc1"))
mgr.SetRaftLeader(true)
go mgr.Run(suite.ctx)
@ -273,96 +280,239 @@ func (suite *meshControllerTestSuite) TestController() {
apiProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "api-abc").ID()
webProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "web-def").ID()
// Check that proxy state template resource is generated for both the api and web workloads.
var webProxyStateTemplate *pbresource.Resource
retry.Run(suite.T(), func(r *retry.R) {
suite.client.RequireResourceExists(r, apiProxyStateTemplateID)
webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID)
var (
webProxyStateTemplate *pbresource.Resource
webDestinations *pbresource.Resource
)
testutil.RunStep(suite.T(), "proxy state template generation", func(t *testing.T) {
// Check that proxy state template resource is generated for both the api and web workloads.
retry.Run(t, func(r *retry.R) {
suite.client.RequireResourceExists(r, apiProxyStateTemplateID)
webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID)
})
})
// Add a source service and check that a new proxy state is generated.
webDestinations := resourcetest.Resource(types.UpstreamsType, "web-destinations").
WithData(suite.T(), &pbmesh.Upstreams{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}},
Upstreams: []*pbmesh.Upstream{
{
DestinationRef: resource.Reference(suite.apiService.Id, ""),
DestinationPort: "tcp",
testutil.RunStep(suite.T(), "add explicit destinations and check that new proxy state is generated", func(t *testing.T) {
// Add a source service and check that a new proxy state is generated.
webDestinations = resourcetest.Resource(types.UpstreamsType, "web-destinations").
WithData(suite.T(), &pbmesh.Upstreams{
Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}},
Upstreams: []*pbmesh.Upstream{
{
DestinationRef: resource.Reference(suite.apiService.Id, ""),
DestinationPort: "tcp",
ListenAddr: &pbmesh.Upstream_IpPort{
IpPort: &pbmesh.IPPortAddress{
Ip: "127.0.0.1",
Port: 1234,
},
},
},
},
},
}).Write(suite.T(), suite.client)
webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
}).Write(suite.T(), suite.client)
webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version)
// Update destination's service endpoints and workload to be non-mesh
// and check that:
// * api's proxy state template is deleted
// * we get a new web proxy resource re-generated
// * the status on Upstreams resource is updated with a validation error
nonMeshPorts := map[string]*pbcatalog.WorkloadPort{
"tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
}
// Note: the order matters here because in reality service endpoints will only
// be reconciled after the workload has been updated, and so we need to write the
// workload before we write service endpoints.
suite.runtime.Logger.Trace("test: updating api-abc workload to be non-mesh")
resourcetest.Resource(catalog.WorkloadType, "api-abc").
WithData(suite.T(), &pbcatalog.Workload{
Identity: "api-identity",
Addresses: suite.apiWorkload.Addresses,
Ports: nonMeshPorts}).
Write(suite.T(), suite.client)
suite.runtime.Logger.Trace("test: updating api-service to be non-mesh")
resourcetest.Resource(catalog.ServiceEndpointsType, "api-service").
WithData(suite.T(), &pbcatalog.ServiceEndpoints{
Endpoints: []*pbcatalog.Endpoint{
{
TargetRef: suite.apiWorkloadID,
Addresses: suite.apiWorkload.Addresses,
Ports: nonMeshPorts,
Identity: "api-identity",
},
},
}).
Write(suite.T(), suite.client.ResourceServiceClient)
// Check that api proxy template is gone.
retry.Run(suite.T(), func(r *retry.R) {
suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID)
requireExplicitDestinationsFound(t, "api", webProxyStateTemplate)
})
// Check status on the pbmesh.Upstreams resource.
serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, ""))
suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName,
status.ConditionMeshProtocolNotFound(serviceRef))
testutil.RunStep(suite.T(), "update api's ports to be non-mesh", func(t *testing.T) {
// Update destination's service endpoints and workload to be non-mesh
// and check that:
// * api's proxy state template is deleted
// * we get a new web proxy resource re-generated
// * the status on Upstreams resource is updated with a validation error
nonMeshPorts := map[string]*pbcatalog.WorkloadPort{
"tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
}
// We should get a new web proxy template resource because this destination should be removed.
webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
// Note: the order matters here because in reality service endpoints will only
// be reconciled after the workload has been updated, and so we need to write the
// workload before we write service endpoints.
resourcetest.Resource(catalog.WorkloadType, "api-abc").
WithData(suite.T(), &pbcatalog.Workload{
Identity: "api-identity",
Addresses: suite.apiWorkload.Addresses,
Ports: nonMeshPorts}).
Write(suite.T(), suite.client)
// Update destination's service apiEndpoints back to mesh and check that we get a new web proxy resource re-generated
// and that the status on Upstreams resource is updated to be empty.
resourcetest.Resource(catalog.ServiceEndpointsType, "api-service").
WithData(suite.T(), suite.apiEndpointsData).
Write(suite.T(), suite.client.ResourceServiceClient)
resourcetest.Resource(catalog.ServiceEndpointsType, "api-service").
WithData(suite.T(), &pbcatalog.ServiceEndpoints{
Endpoints: []*pbcatalog.Endpoint{
{
TargetRef: suite.apiWorkloadID,
Addresses: suite.apiWorkload.Addresses,
Ports: nonMeshPorts,
Identity: "api-identity",
},
},
}).
Write(suite.T(), suite.client.ResourceServiceClient)
suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName,
status.ConditionMeshProtocolFound(serviceRef))
// Check that api proxy template is gone.
retry.Run(t, func(r *retry.R) {
suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID)
})
// We should also get a new web proxy template resource as this destination should be added again.
webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
// Check status on the pbmesh.Upstreams resource.
serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, ""))
suite.client.WaitForStatusCondition(t, webDestinations.Id, ControllerName,
status.ConditionMeshProtocolNotFound(serviceRef))
// Delete the proxy state template resource and check that it gets regenerated.
_, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webProxyStateTemplateID})
require.NoError(suite.T(), err)
// We should get a new web proxy template resource because this destination should be removed.
webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version)
suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
requireExplicitDestinationsNotFound(t, "api", webProxyStateTemplate)
})
testutil.RunStep(suite.T(), "update ports to be mesh again", func(t *testing.T) {
// Update destination's service endpoints back to mesh and check that we get a new web proxy resource re-generated
// and that the status on Upstreams resource is updated to be empty.
suite.runtime.Logger.Trace("updating ports to mesh")
resourcetest.Resource(catalog.WorkloadType, "api-abc").
WithData(suite.T(), suite.apiWorkload).
Write(suite.T(), suite.client)
resourcetest.Resource(catalog.ServiceEndpointsType, "api-service").
WithData(suite.T(), suite.apiEndpointsData).
Write(suite.T(), suite.client.ResourceServiceClient)
serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, ""))
suite.client.WaitForStatusCondition(t, webDestinations.Id, ControllerName,
status.ConditionMeshProtocolFound(serviceRef))
// We should also get a new web proxy template resource as this destination should be added again.
webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version)
requireExplicitDestinationsFound(t, "api", webProxyStateTemplate)
})
testutil.RunStep(suite.T(), "delete the proxy state template and check re-generation", func(t *testing.T) {
// Delete the proxy state template resource and check that it gets regenerated.
suite.runtime.Logger.Trace("deleting web proxy")
_, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webProxyStateTemplateID})
require.NoError(suite.T(), err)
webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
requireExplicitDestinationsFound(t, "api", webProxyStateTemplate)
})
testutil.RunStep(suite.T(), "add implicit upstream and enable tproxy", func(t *testing.T) {
// Delete explicit destinations resource.
suite.runtime.Logger.Trace("deleting web destinations")
_, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webDestinations.Id})
require.NoError(t, err)
webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
// Enable transparent proxy for the web proxy.
resourcetest.Resource(types.ProxyConfigurationType, "proxy-config").
WithData(t, &pbmesh.ProxyConfiguration{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"web"},
},
DynamicConfig: &pbmesh.DynamicConfig{
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,
TransparentProxy: &pbmesh.TransparentProxy{
OutboundListenerPort: 15001,
},
},
}).Write(t, suite.client)
webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version)
requireImplicitDestinationsFound(t, "api", webProxyStateTemplate)
})
}
func TestMeshController(t *testing.T) {
suite.Run(t, new(meshControllerTestSuite))
}
func requireExplicitDestinationsFound(t *testing.T, name string, tmplResource *pbresource.Resource) {
requireExplicitDestinations(t, name, tmplResource, true)
}
func requireExplicitDestinationsNotFound(t *testing.T, name string, tmplResource *pbresource.Resource) {
requireExplicitDestinations(t, name, tmplResource, false)
}
func requireExplicitDestinations(t *testing.T, name string, tmplResource *pbresource.Resource, found bool) {
t.Helper()
var tmpl pbmesh.ProxyStateTemplate
err := tmplResource.Data.UnmarshalTo(&tmpl)
require.NoError(t, err)
// Check outbound listener.
var foundListener bool
for _, l := range tmpl.ProxyState.Listeners {
if strings.Contains(l.Name, name) && l.Direction == pbproxystate.Direction_DIRECTION_OUTBOUND {
foundListener = true
break
}
}
require.Equal(t, found, foundListener)
requireClustersAndEndpoints(t, name, &tmpl, found)
}
func requireImplicitDestinationsFound(t *testing.T, name string, tmplResource *pbresource.Resource) {
t.Helper()
var tmpl pbmesh.ProxyStateTemplate
err := tmplResource.Data.UnmarshalTo(&tmpl)
require.NoError(t, err)
// Check outbound listener.
var foundListener bool
for _, l := range tmpl.ProxyState.Listeners {
if strings.Contains(l.Name, xdscommon.OutboundListenerName) && l.Direction == pbproxystate.Direction_DIRECTION_OUTBOUND {
foundListener = true
// Check the listener filter chain
for _, r := range l.Routers {
destName := r.Destination.(*pbproxystate.Router_L4).L4.Name
if strings.Contains(destName, name) {
// We expect that there is a filter chain match for transparent proxy destinations.
require.NotNil(t, r.Match)
require.NotEmpty(t, r.Match.PrefixRanges)
break
}
}
break
}
}
require.True(t, foundListener)
requireClustersAndEndpoints(t, name, &tmpl, true)
}
func requireClustersAndEndpoints(t *testing.T, name string, tmpl *pbmesh.ProxyStateTemplate, found bool) {
t.Helper()
var foundCluster bool
for c := range tmpl.ProxyState.Clusters {
if strings.Contains(c, name) {
foundCluster = true
break
}
}
require.Equal(t, found, foundCluster)
var foundEndpoints bool
for c := range tmpl.RequiredEndpoints {
if strings.Contains(c, name) {
foundEndpoints = true
break
}
}
require.Equal(t, found, foundEndpoints)
}
func resourceID(rtype *pbresource.Type, name string) *pbresource.ID {
return &pbresource.ID{
Type: rtype,

View File

@ -3,6 +3,10 @@ package fetcher
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
ctrlStatus "github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/status"
@ -12,13 +16,23 @@ import (
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Fetcher struct {
Client pbresource.ResourceServiceClient
Cache *sidecarproxycache.Cache
Client pbresource.ResourceServiceClient
DestinationsCache *sidecarproxycache.DestinationsCache
ProxyCfgCache *sidecarproxycache.ProxyConfigurationCache
}
func New(client pbresource.ResourceServiceClient,
dCache *sidecarproxycache.DestinationsCache,
pcfgCache *sidecarproxycache.ProxyConfigurationCache) *Fetcher {
return &Fetcher{
Client: client,
DestinationsCache: dCache,
ProxyCfgCache: pcfgCache,
}
}
func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Workload, error) {
@ -28,7 +42,9 @@ func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*interm
case status.Code(err) == codes.NotFound:
// We also need to make sure to delete the associated proxy from cache.
// We are ignoring errors from cache here as this deletion is best effort.
f.Cache.DeleteSourceProxy(resource.ReplaceType(types.ProxyStateTemplateType, id))
proxyID := resource.ReplaceType(types.ProxyStateTemplateType, id)
f.DestinationsCache.DeleteSourceProxy(proxyID)
f.ProxyCfgCache.UntrackProxyID(proxyID)
return nil, nil
case err != nil:
return nil, err
@ -96,6 +112,30 @@ func (f *Fetcher) FetchServiceEndpoints(ctx context.Context, id *pbresource.ID)
return se, nil
}
func (f *Fetcher) FetchService(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Service, error) {
rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id})
switch {
case status.Code(err) == codes.NotFound:
return nil, nil
case err != nil:
return nil, err
}
se := &intermediateTypes.Service{
Resource: rsp.Resource,
}
var service pbcatalog.Service
err = rsp.Resource.Data.UnmarshalTo(&service)
if err != nil {
return nil, resource.NewErrDataParse(&service, err)
}
se.Service = &service
return se, nil
}
func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Destinations, error) {
rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id})
@ -120,9 +160,9 @@ func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*in
return u, nil
}
func (f *Fetcher) FetchDestinationsData(
func (f *Fetcher) FetchExplicitDestinationsData(
ctx context.Context,
destinationRefs []intermediateTypes.CombinedDestinationRef,
explDestRefs []intermediateTypes.CombinedDestinationRef,
) ([]*intermediateTypes.Destination, map[string]*intermediateTypes.Status, error) {
var (
@ -130,7 +170,7 @@ func (f *Fetcher) FetchDestinationsData(
statuses = make(map[string]*intermediateTypes.Status)
)
for _, dest := range destinationRefs {
for _, dest := range explDestRefs {
// Fetch Destinations resource if there is one.
us, err := f.FetchDestinations(ctx, dest.ExplicitDestinationsID)
if err != nil {
@ -141,11 +181,12 @@ func (f *Fetcher) FetchDestinationsData(
if us == nil {
// If the Destinations resource is not found, then we should delete it from cache and continue.
f.Cache.DeleteDestination(dest.ServiceRef, dest.Port)
f.DestinationsCache.DeleteDestination(dest.ServiceRef, dest.Port)
continue
}
d := &intermediateTypes.Destination{}
// As Destinations resource contains a list of destinations,
// we need to find the one that references our service and port.
d.Explicit = findDestination(dest.ServiceRef, dest.Port, us.Destinations)
@ -173,7 +214,7 @@ func (f *Fetcher) FetchDestinationsData(
d.ServiceEndpoints = se
// Check if this endpoints is mesh-enabled. If not, remove it from cache and return an error.
if !IsMeshEnabled(se.Endpoints.Endpoints[0].Ports) {
if len(se.Endpoints.Endpoints) > 0 && !IsMeshEnabled(se.Endpoints.Endpoints[0].Ports) {
// Add invalid status but don't remove from cache. If this state changes,
// we want to be able to detect this change.
updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID,
@ -190,7 +231,9 @@ func (f *Fetcher) FetchDestinationsData(
// No destination port should point to a port with "mesh" protocol,
// so check if destination port has the mesh protocol and update the status.
if se.Endpoints.Endpoints[0].Ports[dest.Port].Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
if len(se.Endpoints.Endpoints) > 0 &&
se.Endpoints.Endpoints[0].Ports[dest.Port].Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID,
us.Resource.Status, us.Resource.Generation, ctrlStatus.ConditionMeshProtocolDestinationPort(serviceRef, dest.Port))
continue
@ -217,6 +260,124 @@ func (f *Fetcher) FetchDestinationsData(
return destinations, statuses, nil
}
// FetchImplicitDestinationsData fetches all implicit destinations and adds them to existing destinations.
// If the implicit destination is already in addToDestinations, it will be skipped.
// todo (ishustava): this function will eventually need to fetch implicit destinations from the ImplicitDestinations resource instead.
func (f *Fetcher) FetchImplicitDestinationsData(ctx context.Context, proxyID *pbresource.ID, addToDestinations []*intermediateTypes.Destination) ([]*intermediateTypes.Destination, error) {
// First, convert existing destinations to a map so we can de-dup.
destinations := make(map[resource.ReferenceKey]*intermediateTypes.Destination)
for _, d := range addToDestinations {
destinations[resource.NewReferenceKey(d.ServiceEndpoints.Resource.Id)] = d
}
// For now, we need to look up all service endpoints within a partition.
rsp, err := f.Client.List(ctx, &pbresource.ListRequest{
Type: catalog.ServiceEndpointsType,
Tenancy: &pbresource.Tenancy{
Namespace: proxyID.Tenancy.Namespace,
Partition: proxyID.Tenancy.Partition,
PeerName: proxyID.Tenancy.PeerName,
},
})
if err != nil {
return nil, err
}
for _, r := range rsp.Resources {
// If it's already in destinations, ignore it.
if _, ok := destinations[resource.NewReferenceKey(r.Id)]; ok {
continue
}
var endpoints pbcatalog.ServiceEndpoints
err = r.Data.UnmarshalTo(&endpoints)
if err != nil {
return nil, err
}
// If this proxy is a part of this service, ignore it.
if isPartOfService(resource.ReplaceType(catalog.WorkloadType, proxyID), &endpoints) {
continue
}
// Skip if this service is not mesh-enabled.
if len(endpoints.Endpoints) > 0 && !IsMeshEnabled(endpoints.Endpoints[0].Ports) {
continue
}
// Collect all identities.
var identities []*pbresource.Reference
for _, ep := range endpoints.Endpoints {
identities = append(identities, &pbresource.Reference{
Name: ep.Identity,
Tenancy: r.Id.Tenancy,
})
}
// Fetch the service.
// todo (ishustava): this should eventually grab virtual IPs resource.
s, err := f.FetchService(ctx, resource.ReplaceType(catalog.ServiceType, r.Id))
if err != nil {
return nil, err
}
if s == nil {
// If service no longer exists, skip.
continue
}
d := &intermediateTypes.Destination{
ServiceEndpoints: &intermediateTypes.ServiceEndpoints{
Resource: r,
Endpoints: &endpoints,
},
VirtualIPs: s.Service.VirtualIps,
Identities: identities,
}
addToDestinations = append(addToDestinations, d)
}
return addToDestinations, err
}
// FetchAndMergeProxyConfigurations fetches proxy configurations for the proxy state template provided by id
// and merges them into one object.
func (f *Fetcher) FetchAndMergeProxyConfigurations(ctx context.Context, id *pbresource.ID) (*pbmesh.ProxyConfiguration, error) {
proxyCfgRefs := f.ProxyCfgCache.ProxyConfigurationsByProxyID(id)
result := &pbmesh.ProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{},
}
for _, ref := range proxyCfgRefs {
proxyCfgID := &pbresource.ID{
Name: ref.GetName(),
Type: ref.GetType(),
Tenancy: ref.GetTenancy(),
}
rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{
Id: proxyCfgID,
})
switch {
case status.Code(err) == codes.NotFound:
f.ProxyCfgCache.UntrackProxyConfiguration(proxyCfgID)
return nil, nil
case err != nil:
return nil, err
}
var proxyCfg pbmesh.ProxyConfiguration
err = rsp.Resource.Data.UnmarshalTo(&proxyCfg)
if err != nil {
return nil, err
}
// Note that we only care about dynamic config as bootstrap config
// will not be updated dynamically by this controller.
// todo (ishustava): do sorting etc.
proto.Merge(result.DynamicConfig, proxyCfg.DynamicConfig)
}
return result, nil
}
// IsMeshEnabled returns true if the workload or service endpoints port
// contain a port with the "mesh" protocol.
func IsMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool {
@ -256,3 +417,17 @@ func updateStatusCondition(
}
}
}
func isPartOfService(workloadID *pbresource.ID, endpoints *pbcatalog.ServiceEndpoints) bool {
// convert IDs to refs so that we can compare without UIDs.
workloadRef := resource.Reference(workloadID, "")
for _, ep := range endpoints.Endpoints {
if ep.TargetRef != nil {
targetRef := resource.Reference(ep.TargetRef, "")
if resource.EqualReference(workloadRef, targetRef) {
return true
}
}
}
return false
}

View File

@ -176,7 +176,8 @@ func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() {
proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID()
// Create cache and pre-populate it.
c := sidecarproxycache.New()
destCache := sidecarproxycache.NewDestinationsCache()
proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache()
dest1 := intermediate.CombinedDestinationRef{
ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service-1").ReferenceNoSection(),
Port: "tcp",
@ -193,15 +194,19 @@ func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() {
resource.NewReferenceKey(proxyID): {},
},
}
c.WriteDestination(dest1)
c.WriteDestination(dest2)
destCache.WriteDestination(dest1)
destCache.WriteDestination(dest2)
f := Fetcher{Cache: c, Client: suite.client}
proxyCfgID := resourcetest.Resource(types.ProxyConfigurationType, "proxy-config").ID()
proxyCfgCache.TrackProxyConfiguration(proxyCfgID, []resource.ReferenceOrID{proxyID})
f := Fetcher{DestinationsCache: destCache, ProxyCfgCache: proxyCfgCache, Client: suite.client}
_, err := f.FetchWorkload(context.Background(), proxyID)
require.NoError(suite.T(), err)
// Check that cache is updated to remove proxy id.
require.Nil(suite.T(), c.DestinationsBySourceProxy(proxyID))
require.Nil(suite.T(), destCache.DestinationsBySourceProxy(proxyID))
require.Nil(suite.T(), proxyCfgCache.ProxyConfigurationsByProxyID(proxyID))
}
func (suite *dataFetcherSuite) TestFetcher_NotFound() {
@ -236,6 +241,13 @@ func (suite *dataFetcherSuite) TestFetcher_NotFound() {
return err
},
},
"service": {
typ: catalog.ServiceType,
fetchFunc: func(id *pbresource.ID) error {
_, err := f.FetchService(context.Background(), id)
return err
},
},
}
for name, c := range cases {
@ -283,6 +295,13 @@ func (suite *dataFetcherSuite) TestFetcher_FetchErrors() {
return err
},
},
"service": {
name: "web-service",
fetchFunc: func(id *pbresource.ID) error {
_, err := f.FetchService(context.Background(), id)
return err
},
},
}
for name, c := range cases {
@ -313,7 +332,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchErrors() {
}
}
func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
func (suite *dataFetcherSuite) TestFetcher_FetchExplicitDestinationsData() {
destination1 := intermediate.CombinedDestinationRef{
ServiceRef: resource.Reference(suite.api1Service.Id, ""),
Port: "tcp",
@ -339,14 +358,14 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
},
}
c := sidecarproxycache.New()
c := sidecarproxycache.NewDestinationsCache()
c.WriteDestination(destination1)
c.WriteDestination(destination2)
c.WriteDestination(destination3)
f := Fetcher{
Cache: c,
Client: suite.client,
DestinationsCache: c,
Client: suite.client,
}
suite.T().Run("destinations not found", func(t *testing.T) {
@ -361,7 +380,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
c.WriteDestination(destinationRefNoDestinations)
destinationRefs := []intermediate.CombinedDestinationRef{destinationRefNoDestinations}
destinations, _, err := f.FetchDestinationsData(suite.ctx, destinationRefs)
destinations, _, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
require.NoError(t, err)
require.Nil(t, destinations)
_, foundDest := c.ReadDestination(destinationRefNoDestinations.ServiceRef, destinationRefNoDestinations.Port)
@ -383,7 +402,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
c.WriteDestination(destinationNoServiceEndpoints)
destinationRefs := []intermediate.CombinedDestinationRef{destinationNoServiceEndpoints}
destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs)
destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
require.NoError(t, err)
require.Nil(t, destinations)
@ -423,7 +442,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
c.WriteDestination(destinationNonMeshServiceEndpoints)
destinationRefs := []intermediate.CombinedDestinationRef{destinationNonMeshServiceEndpoints}
destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs)
destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
require.NoError(t, err)
require.Nil(t, destinations)
@ -458,7 +477,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
destinationRefs := []intermediate.CombinedDestinationRef{destination1}
destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs)
destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
serviceRef := resource.ReferenceToString(destination1.ServiceRef)
destinationRef := resource.IDToString(destination1.ExplicitDestinationsID)
expectedStatus := &intermediate.Status{
@ -496,7 +515,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
},
}
_, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs)
_, statuses, err = f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
require.NoError(t, err)
prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef])
})
@ -514,7 +533,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
c.WriteDestination(destinationMeshDestinationPort)
destinationRefs := []intermediate.CombinedDestinationRef{destinationMeshDestinationPort}
destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs)
destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
serviceRef := resource.ReferenceToString(destination1.ServiceRef)
destinationRef := resource.IDToString(destination1.ExplicitDestinationsID)
expectedStatus := &intermediate.Status{
@ -553,7 +572,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
},
}
_, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs)
_, statuses, err = f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
require.NoError(t, err)
prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef])
})
@ -610,7 +629,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
meshStatus.ConditionNonMeshProtocolDestinationPort(ref, d.Port))
}
actualDestinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs)
actualDestinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs)
require.NoError(t, err)
// Check that all statuses have "happy" conditions.
@ -622,6 +641,152 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() {
})
}
func (suite *dataFetcherSuite) TestFetcher_FetchImplicitDestinationsData() {
existingDestinations := []*intermediate.Destination{
{
Explicit: suite.webDestinationsData.Upstreams[0],
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: suite.api1ServiceEndpoints,
Endpoints: suite.api1ServiceEndpointsData,
},
Identities: []*pbresource.Reference{
{
Name: "api-1-identity",
Tenancy: suite.api1Service.Id.Tenancy,
},
},
},
{
Explicit: suite.webDestinationsData.Upstreams[1],
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: suite.api2ServiceEndpoints,
Endpoints: suite.api2ServiceEndpointsData,
},
Identities: []*pbresource.Reference{
{
Name: "api-2-identity",
Tenancy: suite.api2Service.Id.Tenancy,
},
},
},
{
Explicit: suite.webDestinationsData.Upstreams[2],
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: suite.api2ServiceEndpoints,
Endpoints: suite.api2ServiceEndpointsData,
},
Identities: []*pbresource.Reference{
{
Name: "api-2-identity",
Tenancy: suite.api2Service.Id.Tenancy,
},
},
},
}
// Create a few other services to be implicit upstreams.
api3Service := resourcetest.Resource(catalog.ServiceType, "api-3").
WithData(suite.T(), &pbcatalog.Service{
VirtualIps: []string{"192.1.1.1"},
}).
Write(suite.T(), suite.client)
api3ServiceEndpointsData := &pbcatalog.ServiceEndpoints{
Endpoints: []*pbcatalog.Endpoint{
{
TargetRef: &pbresource.ID{
Name: "api-3-abc",
Tenancy: api3Service.Id.Tenancy,
Type: catalog.WorkloadType,
},
Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{
"tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
"mesh": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
},
Identity: "api-3-identity",
},
},
}
api3ServiceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-3").
WithData(suite.T(), api3ServiceEndpointsData).Write(suite.T(), suite.client)
f := Fetcher{
Client: suite.client,
}
expDestinations := append(existingDestinations, &intermediate.Destination{
ServiceEndpoints: &intermediate.ServiceEndpoints{
Resource: api3ServiceEndpoints,
Endpoints: api3ServiceEndpointsData,
},
Identities: []*pbresource.Reference{
{
Name: "api-3-identity",
Tenancy: api3Service.Id.Tenancy,
},
},
VirtualIPs: []string{"192.1.1.1"},
})
actualDestinations, err := f.FetchImplicitDestinationsData(context.Background(), suite.webProxy.Id, existingDestinations)
require.NoError(suite.T(), err)
prototest.AssertElementsMatch(suite.T(), expDestinations, actualDestinations)
}
func (suite *dataFetcherSuite) TestFetcher_FetchAndMergeProxyConfigurations() {
// Create some proxy configurations.
proxyCfg1Data := &pbmesh.ProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,
},
}
proxyCfg2Data := &pbmesh.ProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
MutualTlsMode: pbmesh.MutualTLSMode_MUTUAL_TLS_MODE_DEFAULT,
},
}
proxyCfg1 := resourcetest.Resource(types.ProxyConfigurationType, "config-1").
WithData(suite.T(), proxyCfg1Data).
Write(suite.T(), suite.client)
proxyCfg2 := resourcetest.Resource(types.ProxyConfigurationType, "config-2").
WithData(suite.T(), proxyCfg2Data).
Write(suite.T(), suite.client)
proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache()
proxyCfgCache.TrackProxyConfiguration(proxyCfg1.Id, []resource.ReferenceOrID{suite.webProxy.Id})
proxyCfgCache.TrackProxyConfiguration(proxyCfg2.Id, []resource.ReferenceOrID{suite.webProxy.Id})
expectedProxyCfg := &pbmesh.ProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,
MutualTlsMode: pbmesh.MutualTLSMode_MUTUAL_TLS_MODE_DEFAULT,
},
}
fetcher := Fetcher{Client: suite.client, ProxyCfgCache: proxyCfgCache}
actualProxyCfg, err := fetcher.FetchAndMergeProxyConfigurations(suite.ctx, suite.webProxy.Id)
require.NoError(suite.T(), err)
prototest.AssertDeepEqual(suite.T(), expectedProxyCfg, actualProxyCfg)
// Delete proxy cfg and check that the cache gets updated.
_, err = suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: proxyCfg1.Id})
require.NoError(suite.T(), err)
_, err = fetcher.FetchAndMergeProxyConfigurations(suite.ctx, suite.webProxy.Id)
require.NoError(suite.T(), err)
proxyCfg2.Id.Uid = ""
prototest.AssertElementsMatch(suite.T(),
[]*pbresource.ID{proxyCfg2.Id},
fetcher.ProxyCfgCache.ProxyConfigurationsByProxyID(suite.webProxy.Id))
}
func TestDataFetcher(t *testing.T) {
suite.Run(t, new(dataFetcherSuite))
}

View File

@ -3,9 +3,7 @@ package sidecarproxymapper
import (
"context"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/mesh/internal/types/intermediate"
"github.com/hashicorp/consul/internal/resource"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
@ -21,38 +19,15 @@ func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt con
// Look up workloads for this destinations.
sourceProxyIDs := make(map[resource.ReferenceKey]struct{})
var result []controller.Request
for _, prefix := range destinations.Workloads.Prefixes {
resp, err := rt.Client.List(ctx, &pbresource.ListRequest{
Type: catalog.WorkloadType,
Tenancy: res.Id.Tenancy,
NamePrefix: prefix,
})
if err != nil {
return nil, err
}
for _, r := range resp.Resources {
proxyID := resource.ReplaceType(types.ProxyStateTemplateType, r.Id)
sourceProxyIDs[resource.NewReferenceKey(proxyID)] = struct{}{}
result = append(result, controller.Request{
ID: proxyID,
})
}
requests, err := mapSelectorToProxyStateTemplates(ctx, rt.Client, destinations.Workloads, res.Id.Tenancy, func(id *pbresource.ID) {
sourceProxyIDs[resource.NewReferenceKey(id)] = struct{}{}
})
if err != nil {
return nil, err
}
for _, name := range destinations.Workloads.Names {
proxyID := &pbresource.ID{
Name: name,
Tenancy: res.Id.Tenancy,
Type: types.ProxyStateTemplateType,
}
sourceProxyIDs[resource.NewReferenceKey(proxyID)] = struct{}{}
result = append(result, controller.Request{
ID: proxyID,
})
}
// Add this destination to cache.
// Add this destination to destinationsCache.
for _, destination := range destinations.Upstreams {
destinationRef := intermediate.CombinedDestinationRef{
ServiceRef: destination.DestinationRef,
@ -60,8 +35,8 @@ func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt con
ExplicitDestinationsID: res.Id,
SourceProxies: sourceProxyIDs,
}
m.cache.WriteDestination(destinationRef)
m.destinationsCache.WriteDestination(destinationRef)
}
return result, nil
return requests, nil
}

View File

@ -64,8 +64,8 @@ func TestMapDestinationsToProxyStateTemplate(t *testing.T) {
WithData(t, webDestinationsData).
Write(t, client)
c := sidecarproxycache.New()
mapper := &Mapper{cache: c}
c := sidecarproxycache.NewDestinationsCache()
mapper := &Mapper{destinationsCache: c}
expRequests := []controller.Request{
{ID: resource.ReplaceType(types.ProxyStateTemplateType, webWorkload1.Id)},

View File

@ -0,0 +1,67 @@
package sidecarproxymapper
import (
"context"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
type Mapper struct {
destinationsCache *sidecarproxycache.DestinationsCache
proxyCfgCache *sidecarproxycache.ProxyConfigurationCache
}
func New(destinationsCache *sidecarproxycache.DestinationsCache, proxyCfgCache *sidecarproxycache.ProxyConfigurationCache) *Mapper {
return &Mapper{
destinationsCache: destinationsCache,
proxyCfgCache: proxyCfgCache,
}
}
// mapSelectorToProxyStateTemplates returns ProxyStateTemplate requests given a workload
// selector and tenancy. The cacheFunc can be called if the resulting ids need to be cached.
func mapSelectorToProxyStateTemplates(ctx context.Context,
client pbresource.ResourceServiceClient,
selector *pbcatalog.WorkloadSelector,
tenancy *pbresource.Tenancy,
cacheFunc func(id *pbresource.ID)) ([]controller.Request, error) {
var result []controller.Request
for _, prefix := range selector.Prefixes {
resp, err := client.List(ctx, &pbresource.ListRequest{
Type: catalog.WorkloadType,
Tenancy: tenancy,
NamePrefix: prefix,
})
if err != nil {
return nil, err
}
for _, r := range resp.Resources {
id := resource.ReplaceType(types.ProxyStateTemplateType, r.Id)
result = append(result, controller.Request{
ID: id,
})
cacheFunc(id)
}
}
for _, name := range selector.Names {
id := &pbresource.ID{
Name: name,
Tenancy: tenancy,
Type: types.ProxyStateTemplateType,
}
result = append(result, controller.Request{
ID: id,
})
cacheFunc(id)
}
return result, nil
}

View File

@ -0,0 +1,76 @@
package sidecarproxymapper
import (
"context"
"testing"
"github.com/stretchr/testify/require"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
)
func TestMapWorkloadsBySelector(t *testing.T) {
client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes)
// Create some workloads.
// For this test, we don't care about the workload data, so we will re-use
// the same data for all workloads.
workloadData := &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{"p1": {Port: 8080}},
}
w1 := resourcetest.Resource(catalog.WorkloadType, "w1").
WithData(t, workloadData).
Write(t, client).Id
w2 := resourcetest.Resource(catalog.WorkloadType, "w2").
WithData(t, workloadData).
Write(t, client).Id
w3 := resourcetest.Resource(catalog.WorkloadType, "prefix-w3").
WithData(t, workloadData).
Write(t, client).Id
w4 := resourcetest.Resource(catalog.WorkloadType, "prefix-w4").
WithData(t, workloadData).
Write(t, client).Id
// This workload should not be used as it's not selected by the workload selector.
resourcetest.Resource(catalog.WorkloadType, "not-selected-workload").
WithData(t, workloadData).
Write(t, client)
selector := &pbcatalog.WorkloadSelector{
Names: []string{"w1", "w2"},
Prefixes: []string{"prefix"},
}
expReqs := []controller.Request{
{ID: resource.ReplaceType(types.ProxyStateTemplateType, w1)},
{ID: resource.ReplaceType(types.ProxyStateTemplateType, w2)},
{ID: resource.ReplaceType(types.ProxyStateTemplateType, w3)},
{ID: resource.ReplaceType(types.ProxyStateTemplateType, w4)},
}
var cachedReqs []controller.Request
reqs, err := mapSelectorToProxyStateTemplates(context.Background(), client, selector, defaultTenancy(), func(id *pbresource.ID) {
// save IDs to check that the cache func is called
cachedReqs = append(cachedReqs, controller.Request{ID: id})
})
require.NoError(t, err)
require.Len(t, reqs, len(expReqs))
prototest.AssertElementsMatch(t, expReqs, reqs)
prototest.AssertElementsMatch(t, expReqs, cachedReqs)
}
func defaultTenancy() *pbresource.Tenancy {
return &pbresource.Tenancy{
Namespace: "default",
Partition: "default",
PeerName: "local",
}
}

View File

@ -0,0 +1,31 @@
package sidecarproxymapper
import (
"context"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
func (m *Mapper) MapProxyConfigurationToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
var proxyConfig pbmesh.ProxyConfiguration
err := res.Data.UnmarshalTo(&proxyConfig)
if err != nil {
return nil, err
}
var proxyIDs []resource.ReferenceOrID
requests, err := mapSelectorToProxyStateTemplates(ctx, rt.Client, proxyConfig.Workloads, res.Id.Tenancy, func(id *pbresource.ID) {
proxyIDs = append(proxyIDs, id)
})
if err != nil {
return nil, err
}
m.proxyCfgCache.TrackProxyConfiguration(res.Id, proxyIDs)
return requests, nil
}

View File

@ -0,0 +1,73 @@
package sidecarproxymapper
import (
"context"
"testing"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/stretchr/testify/require"
)
func TestProxyConfigurationMapper(t *testing.T) {
client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes)
// Create some workloads.
// For this test, we don't care about the workload data, so we will re-use
// the same data for all workloads.
workloadData := &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}},
Ports: map[string]*pbcatalog.WorkloadPort{"p1": {Port: 8080}},
}
w1 := resourcetest.Resource(catalog.WorkloadType, "w1").
WithData(t, workloadData).
Write(t, client).Id
w2 := resourcetest.Resource(catalog.WorkloadType, "w2").
WithData(t, workloadData).
Write(t, client).Id
// Create proxy configuration.
proxyCfgData := &pbmesh.ProxyConfiguration{
Workloads: &pbcatalog.WorkloadSelector{
Names: []string{"w1", "w2"},
},
}
pCfg := resourcetest.Resource(types.ProxyConfigurationType, "proxy-config").
WithData(t, proxyCfgData).
Write(t, client)
m := Mapper{proxyCfgCache: sidecarproxycache.NewProxyConfigurationCache()}
reqs, err := m.MapProxyConfigurationToProxyStateTemplate(context.Background(), controller.Runtime{
Client: client,
}, pCfg)
require.NoError(t, err)
p1 := resource.ReplaceType(types.ProxyStateTemplateType, w1)
p2 := resource.ReplaceType(types.ProxyStateTemplateType, w2)
expReqs := []controller.Request{
{ID: p1},
{ID: p2},
}
prototest.AssertElementsMatch(t, expReqs, reqs)
// Check that the cache is populated.
// Clean out UID as we don't care about it in the cache.
pCfg.Id.Uid = ""
prototest.AssertElementsMatch(t,
[]*pbresource.ID{pCfg.Id},
m.proxyCfgCache.ProxyConfigurationsByProxyID(p1))
prototest.AssertElementsMatch(t,
[]*pbresource.ID{pCfg.Id},
m.proxyCfgCache.ProxyConfigurationsByProxyID(p2))
}

View File

@ -5,26 +5,16 @@ import (
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
type Mapper struct {
cache *sidecarproxycache.Cache
}
func New(c *sidecarproxycache.Cache) *Mapper {
return &Mapper{
cache: c,
}
}
// MapServiceEndpointsToProxyStateTemplate maps catalog.ServiceEndpoints objects to the IDs of
// ProxyStateTemplate.
func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
// This mapper needs to look up workload IDs from service endpoints and replace them with ProxyStateTemplate type.
var serviceEndpoints pbcatalog.ServiceEndpoints
err := res.Data.UnmarshalTo(&serviceEndpoints)
@ -68,7 +58,7 @@ func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ co
continue
}
serviceRef := resource.Reference(serviceID, "")
if destination, ok := m.cache.ReadDestination(serviceRef, portName); ok {
if destination, ok := m.destinationsCache.ReadDestination(serviceRef, portName); ok {
for refKey := range destination.SourceProxies {
result = append(result, controller.Request{ID: refKey.ToID()})
}
@ -76,5 +66,24 @@ func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ co
}
}
// todo (ishustava): this is a stub for now until we implement implicit destinations.
// For tproxy, we generate requests for all proxy states in the cluster.
// This will generate duplicate events for proxies already added above,
// however, we expect that the controller runtime will de-dup for us.
rsp, err := rt.Client.List(ctx, &pbresource.ListRequest{
Type: types.ProxyStateTemplateType,
Tenancy: &pbresource.Tenancy{
Namespace: storage.Wildcard,
Partition: res.Id.Tenancy.Partition,
PeerName: res.Id.Tenancy.PeerName,
},
})
if err != nil {
return nil, err
}
for _, r := range rsp.Resources {
result = append(result, controller.Request{ID: r.Id})
}
return result, err
}

View File

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/require"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache"
@ -18,6 +19,7 @@ import (
)
func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) {
client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes)
workload1 := resourcetest.Resource(catalog.WorkloadType, "workload-1").
WithTenancy(resource.DefaultNamespacedTenancy()).Build()
workload2 := resourcetest.Resource(catalog.WorkloadType, "workload-2").
@ -50,8 +52,8 @@ func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) {
proxyTmpl2ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-2").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
c := sidecarproxycache.New()
mapper := &Mapper{cache: c}
c := sidecarproxycache.NewDestinationsCache()
mapper := &Mapper{destinationsCache: c}
sourceProxy1 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-3").
WithTenancy(resource.DefaultNamespacedTenancy()).ID()
sourceProxy2 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-4").
@ -95,7 +97,7 @@ func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) {
{ID: sourceProxy3},
}
requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{}, serviceEndpoints)
requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{Client: client}, serviceEndpoints)
require.NoError(t, err)
prototest.AssertElementsMatch(t, expRequests, requests)
}

View File

@ -29,6 +29,11 @@ type ServiceEndpoints struct {
Endpoints *pbcatalog.ServiceEndpoints
}
type Service struct {
Resource *pbresource.Resource
Service *pbcatalog.Service
}
type Destinations struct {
Resource *pbresource.Resource
Destinations *pbmesh.Upstreams
@ -53,6 +58,7 @@ type Destination struct {
Explicit *pbmesh.Upstream
ServiceEndpoints *ServiceEndpoints
Identities []*pbresource.Reference
VirtualIPs []string
}
type Status struct {

View File

@ -0,0 +1,6 @@
package meshv1alpha1
func (p *ProxyConfiguration) IsTransparentProxy() bool {
return p.GetDynamicConfig() != nil &&
p.DynamicConfig.Mode == ProxyMode_PROXY_MODE_TRANSPARENT
}

View File

@ -0,0 +1,49 @@
package meshv1alpha1
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestIsTransprentProxy(t *testing.T) {
cases := map[string]struct {
proxyCfg *ProxyConfiguration
exp bool
}{
"nil dynamic config": {
proxyCfg: &ProxyConfiguration{},
exp: false,
},
"default mode": {
proxyCfg: &ProxyConfiguration{
DynamicConfig: &DynamicConfig{
Mode: ProxyMode_PROXY_MODE_DEFAULT,
},
},
exp: false,
},
"direct mode": {
proxyCfg: &ProxyConfiguration{
DynamicConfig: &DynamicConfig{
Mode: ProxyMode_PROXY_MODE_DEFAULT,
},
},
exp: false,
},
"transparent mode": {
proxyCfg: &ProxyConfiguration{
DynamicConfig: &DynamicConfig{
Mode: ProxyMode_PROXY_MODE_TRANSPARENT,
},
},
exp: true,
},
}
for name, c := range cases {
t.Run(name, func(t *testing.T) {
require.Equal(t, c.exp, c.proxyCfg.IsTransparentProxy())
})
}
}