Ensure passthrough addresses get cleaned up

Transparent proxies can set up filter chains that allow direct
connections to upstream service instances. Services that can be dialed
directly are stored in the PassthroughUpstreams map of the proxycfg
snapshot.

Previously these addresses were not being cleaned up based on new
service health data. The list of addresses associated with an upstream
service would only ever grow.

As services scale up and down, eventually they will have instances
assigned to an IP that was previously assigned to a different service.
When IP addresses are duplicated across filter chain match rules the
listener config will be rejected by Envoy.

This commit updates the proxycfg snapshot management so that passthrough
addresses can get cleaned up when no longer associated with a given
upstream.

There is still the possibility of a race condition here where due to
timing an address is shared between multiple passthrough upstreams.
That concern is mitigated by #12195, but will be further addressed
in a follow-up.
This commit is contained in:
freddygv 2022-01-27 20:52:26 -07:00
parent c31c1158a6
commit 659ebc05a9
13 changed files with 169 additions and 163 deletions

View File

@ -27,7 +27,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
@ -326,6 +326,12 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid)
}
}
for uid, _ := range snap.ConnectProxy.PassthroughUpstreams {
if _, ok := seenUpstreams[uid]; !ok {
delete(snap.ConnectProxy.PassthroughUpstreams, uid)
}
}
// These entries are intentionally handled separately from the WatchedDiscoveryChains above.
// There have been situations where a discovery watch was cancelled, then fired.
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up

View File

@ -234,7 +234,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
@ -292,7 +292,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},

View File

@ -45,6 +45,25 @@ func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID {
return id
}
func NewUpstreamIDFromTargetID(tid string) UpstreamID {
// Drop the leading subset if one is present in the target ID.
separators := strings.Count(tid, ".")
if separators > 3 {
prefix := tid[:strings.Index(tid, ".")+1]
tid = strings.TrimPrefix(tid, prefix)
}
split := strings.SplitN(tid, ".", 4)
id := UpstreamID{
Name: split[0],
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(split[2], split[1]),
Datacenter: split[3],
}
id.normalize()
return id
}
func (u *UpstreamID) normalize() {
if u.Type == structs.UpstreamDestTypeService {
u.Type = ""

View File

@ -8,6 +8,43 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
// TODO(freddy): Needs enterprise test
func TestUpstreamIDFromTargetID(t *testing.T) {
type testcase struct {
tid string
expect UpstreamID
}
run := func(t *testing.T, tc testcase) {
tc.expect.EnterpriseMeta.Normalize()
got := NewUpstreamIDFromTargetID(tc.tid)
require.Equal(t, tc.expect, got)
}
cases := map[string]testcase{
"with subset": {
tid: "v1.foo.default.default.dc2",
expect: UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
},
"without subset": {
tid: "foo.default.default.dc2",
expect: UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
func TestUpstreamIDFromString(t *testing.T) {
type testcase struct {
id string

View File

@ -9,7 +9,6 @@ import (
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
)
@ -52,8 +51,9 @@ type ConfigSnapshotUpstreams struct {
// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[UpstreamID]*structs.Upstream
// PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs.
PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs
// PassthroughEndpoints is a map of: UpstreamID -> (map of TargetID ->
// (set of IP addresses)).
PassthroughUpstreams map[UpstreamID]map[string]map[string]struct{}
// IntentionUpstreams is a set of upstreams inferred from intentions.
//
@ -91,18 +91,6 @@ func gatewayKeyFromString(s string) GatewayKey {
return GatewayKey{Partition: split[0], Datacenter: split[1]}
}
// ServicePassthroughAddrs contains the LAN addrs
type ServicePassthroughAddrs struct {
// SNI is the Service SNI of the upstream.
SNI string
// SpiffeID is the SPIFFE ID to use for upstream SAN validation.
SpiffeID connect.SpiffeIDService
// Addrs is a set of the best LAN addresses for the instances of the upstream.
Addrs map[string]struct{}
}
type configSnapshotConnectProxy struct {
ConfigSnapshotUpstreams

View File

@ -12,7 +12,6 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
@ -1985,17 +1984,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// The LAN service address is used below because transparent proxying
// does not support querying service nodes in other DCs, and the WAN address
// should not be used in DC-local calls.
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
dbUID: {
SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain),
SpiffeID: connect.SpiffeIDService{
Host: snap.Roots.TrustDomain,
Namespace: db.NamespaceOrDefault(),
Partition: db.PartitionOrDefault(),
Datacenter: snap.Datacenter,
Service: "db",
},
Addrs: map[string]struct{}{
"db.default.default.dc1": map[string]struct{}{
"10.10.10.10": {},
"10.0.0.2": {},
},
@ -2098,17 +2089,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
)
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
dbUID: {
SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain),
SpiffeID: connect.SpiffeIDService{
Host: snap.Roots.TrustDomain,
Namespace: db.NamespaceOrDefault(),
Partition: db.PartitionOrDefault(),
Datacenter: snap.Datacenter,
Service: "db",
},
Addrs: map[string]struct{}{
"db.default.default.dc1": map[string]struct{}{
"10.0.0.2": {},
},
},

View File

@ -9,8 +9,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
@ -92,56 +90,25 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
}
upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes
var passthroughAddrs map[string]ServicePassthroughAddrs
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
return nil
}
if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; !ok {
upstreamsSnapshot.PassthroughUpstreams[uid] = make(map[string]map[string]struct{})
}
upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{})
for _, node := range resp.Nodes {
if snap.Proxy.Mode == structs.ProxyModeTransparent && node.Service.Proxy.TransparentProxy.DialedDirectly {
if passthroughAddrs == nil {
passthroughAddrs = make(map[string]ServicePassthroughAddrs)
}
svc := node.Service.CompoundServiceName()
// Overwrite the name if it's a connect proxy (as opposed to Connect native).
// We don't reference the proxy name directly for things like SNI, but rather the name
// of the destination. The enterprise meta of a proxy will always be the same as that of
// the destination service, so that remains intact.
if node.Service.Kind == structs.ServiceKindConnectProxy {
dst := node.Service.Proxy.DestinationServiceName
if dst == "" {
dst = node.Service.Proxy.DestinationServiceID
}
svc.Name = dst
}
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), snap.Datacenter, snap.Roots.TrustDomain)
spiffeID := connect.SpiffeIDService{
Host: snap.Roots.TrustDomain,
Partition: svc.PartitionOrDefault(),
Namespace: svc.NamespaceOrDefault(),
Datacenter: snap.Datacenter,
Service: svc.Name,
}
svcUID := NewUpstreamIDFromServiceName(svc)
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok {
upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{
SNI: sni,
SpiffeID: spiffeID,
// Stored in a set because it's possible for these to be duplicated
// when the upstream-target is targeted by multiple discovery chains.
Addrs: make(map[string]struct{}),
}
if !node.Service.Proxy.TransparentProxy.DialedDirectly {
continue
}
// Make sure to use an external address when crossing partitions.
isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault())
// Datacenter is not considered because transparent proxies cannot dial other datacenters.
isRemote := !structs.EqualPartitions(node.Node.PartitionOrDefault(), s.proxyID.PartitionOrDefault())
addr, _ := node.BestAddress(isRemote)
upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{}
}
upstreamsSnapshot.PassthroughUpstreams[uid][targetID][addr] = struct{}{}
}
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):

View File

@ -171,9 +171,15 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
})
}
for _, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams {
for tid := range target {
uid := proxycfg.NewUpstreamIDFromTargetID(tid)
sni := connect.ServiceSNI(
uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
// Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream.
name := "passthrough~" + passthrough.SNI
name := "passthrough~" + sni
c := envoy_cluster_v3.Cluster{
Name: name,
@ -186,14 +192,22 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
}
spiffeID := connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Partition: uid.PartitionOrDefault(),
Namespace: uid.NamespaceOrDefault(),
Datacenter: cfgSnap.Datacenter,
Service: uid.Name,
}
commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf())
err := injectSANMatcher(commonTLSContext, passthrough.SpiffeID)
err := injectSANMatcher(commonTLSContext, spiffeID)
if err != nil {
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", passthrough.SNI, err)
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: passthrough.SNI,
Sni: sni,
}
transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext)
if err != nil {
@ -202,6 +216,7 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
c.TransportSocket = transportSocket
clusters = append(clusters, &c)
}
}
return clusters, nil
}

View File

@ -678,28 +678,14 @@ func TestClustersFromSnapshot(t *testing.T) {
}
// We add a passthrough cluster for each upstream service name
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{
kafkaUID: {
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
SpiffeID: connect.SpiffeIDService{
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Namespace: "default",
Datacenter: "dc1",
Service: "kafka",
},
Addrs: map[string]struct{}{
"kafka.default.default.dc1": map[string]struct{}{
"9.9.9.9": {},
},
},
mongoUID: {
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
SpiffeID: connect.SpiffeIDService{
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Namespace: "default",
Datacenter: "dc1",
Service: "mongo",
},
Addrs: map[string]struct{}{
"mongo.default.default.dc1": map[string]struct{}{
"10.10.10.10": {},
"10.10.10.12": {},
},

View File

@ -218,7 +218,13 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// as opposed to via a virtual IP.
var passthroughChains []*envoy_listener_v3.FilterChain
for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams {
for tid, addrs := range target {
uid := proxycfg.NewUpstreamIDFromTargetID(tid)
sni := connect.ServiceSNI(
uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
u := structs.Upstream{
DestinationName: uid.Name,
DestinationNamespace: uid.NamespaceOrDefault(),
@ -228,17 +234,18 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter)
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
clusterName: "passthrough~" + passthrough.SNI,
clusterName: "passthrough~" + sni,
filterName: filterName,
protocol: "tcp",
})
if err != nil {
return nil, err
}
filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(passthrough.Addrs)
filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(addrs)
passthroughChains = append(passthroughChains, filterChain)
}
}
outboundListener.FilterChains = append(outboundListener.FilterChains, passthroughChains...)

View File

@ -1211,16 +1211,14 @@ func TestListenersFromSnapshot(t *testing.T) {
// We add a filter chains for each passthrough service name.
// The filter chain will route to a cluster with the same SNI name.
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{
kafkaUID: {
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Addrs: map[string]struct{}{
"kafka.default.default.dc1": map[string]struct{}{
"9.9.9.9": {},
},
},
mongoUID: {
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Addrs: map[string]struct{}{
"mongo.default.default.dc1": map[string]struct{}{
"10.10.10.10": {},
"10.10.10.12": {},
},

View File

@ -206,7 +206,7 @@
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
"name": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "ORIGINAL_DST",
"connectTimeout": "5s",
"lbPolicy": "CLUSTER_PROVIDED",
@ -234,18 +234,18 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/kafka"
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/kafka"
}
]
}
},
"sni": "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"sni": "kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
"name": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "ORIGINAL_DST",
"connectTimeout": "5s",
"lbPolicy": "CLUSTER_PROVIDED",
@ -273,12 +273,12 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/mongo"
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo"
}
]
}
},
"sni": "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"sni": "mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
}

View File

@ -55,7 +55,7 @@
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.mongo.default.default.dc1",
"cluster": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"cluster": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
@ -95,7 +95,7 @@
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.kafka.default.default.dc1",
"cluster": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"cluster": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]