peering: update how cross-peer upstreams and represented in proxycfg and rendered in xds (#13362)

This removes unnecessary, vestigal remnants of discovery chains.
This commit is contained in:
R.B. Boyer 2022-06-03 16:42:50 -05:00 committed by GitHub
parent 74158a8aa2
commit 019aeaa57d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 471 additions and 82 deletions

View File

@ -6,8 +6,6 @@ import (
"strings"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
)
@ -33,6 +31,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
snap.ConnectProxy.PeerUpstreamEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
// Watch for root changes
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
@ -184,27 +183,31 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
fallthrough
case "":
// If DestinationPeer is not empty, insert a default discovery chain directly to the snapshot
if u.DestinationPeer != "" {
req := discoverychain.CompileRequest{
ServiceName: u.DestinationName,
EvaluateInNamespace: u.DestinationNamespace,
EvaluateInPartition: u.DestinationPartition,
EvaluateInDatacenter: dc,
EvaluateInTrustDomain: "trustdomain.consul", // TODO(peering): where to evaluate this?
Entries: configentry.NewDiscoveryChainSet(),
}
chain, err := discoverychain.Compile(req)
// NOTE: An upstream that points to a peer by definition will
// only ever watch a single catalog query, so a map key of just
// "UID" is sufficient to cover the peer data watches here.
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
// TODO(peering): We'll need to track a CancelFunc for this
// once the tproxy support lands.
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
PeerName: uid.Peer,
Datacenter: dc,
QueryOptions: structs.QueryOptions{
Token: s.token,
},
ServiceName: u.DestinationName,
Connect: true,
// Note that Identifier doesn't type-prefix for service any more as it's
// the default and makes metrics and other things much cleaner. It's
// simpler for us if we have the type to make things unambiguous.
Source: *s.source,
EnterpriseMeta: uid.EnterpriseMeta,
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
if err != nil {
return snap, fmt.Errorf("error while compiling default discovery chain: %w", err)
}
// Directly insert chain and empty function into the discovery chain maps
snap.ConnectProxy.ConfigSnapshotUpstreams.DiscoveryChain[uid] = chain
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedDiscoveryChains[uid] = func() {}
if err := (*handlerUpstreams)(s).resetWatchesFromChain(ctx, uid, chain, &snap.ConnectProxy.ConfigSnapshotUpstreams); err != nil {
return snap, fmt.Errorf("error while resetting watches from chain: %w", err)
return snap, err
}
// Check whether a watch for this peer exists to avoid duplicates.

View File

@ -1,11 +1,9 @@
package proxycfg
import (
"context"
"testing"
"time"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/require"
@ -15,6 +13,7 @@ import (
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -223,8 +222,6 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{
dbUID: dbDefaultChain(),
},
WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"db.default.default.dc1": TestUpstreamNodes(t, db.Name),
@ -239,10 +236,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
WatchedPeerTrustBundles: map[string]context.CancelFunc{},
PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
@ -284,8 +281,6 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{
dbUID: dbSplitChain(),
},
WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"v1.db.default.default.dc1": TestUpstreamNodes(t, db.Name),
@ -301,10 +296,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
WatchedPeerTrustBundles: map[string]context.CancelFunc{},
PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
@ -330,7 +325,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
dataSources.ConfigEntry.Set(meshConfigReq, &structs.ConfigEntryResponse{Entry: nil})
tt.setup(t, dataSources)
expectSnapCopy, err := copystructure.Copy(tt.expectSnap)
expectSnapCopy, err := tt.expectSnap.Clone()
require.NoError(t, err)
webProxyCopy, err := copystructure.Copy(webProxy)
@ -341,7 +336,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
rootsReq, leafReq,
roots,
webProxyCopy.(*structs.NodeService),
expectSnapCopy.(*ConfigSnapshot),
expectSnapCopy,
)
})
}

View File

@ -79,6 +79,11 @@ type ConfigSnapshotUpstreams struct {
//
// This list only applies to proxies registered in 'transparent' mode.
IntentionUpstreams map[UpstreamID]struct{}
// PeerUpstreamEndpoints is a map of UpstreamID -> (set of IP addresses)
// and used to determine the backing endpoints of an upstream in another
// peer.
PeerUpstreamEndpoints map[UpstreamID]structs.CheckServiceNodes
}
// indexedTarget is used to associate the Raft modify index of a resource
@ -156,7 +161,8 @@ func (c *configSnapshotConnectProxy) isEmpty() bool {
len(c.PassthroughUpstreams) == 0 &&
len(c.IntentionUpstreams) == 0 &&
!c.PeeringTrustBundlesSet &&
!c.MeshConfigSet
!c.MeshConfigSet &&
len(c.PeerUpstreamEndpoints) == 0
}
type configSnapshotTerminatingGateway struct {
@ -516,8 +522,11 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
// nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches
switch s.Kind {
case structs.ServiceKindConnectProxy:
// common with connect-proxy and ingress-gateway
snap.ConnectProxy.WatchedUpstreams = nil
snap.ConnectProxy.WatchedGateways = nil
snap.ConnectProxy.WatchedDiscoveryChains = nil
snap.ConnectProxy.WatchedPeerTrustBundles = nil
case structs.ServiceKindTerminatingGateway:
snap.TerminatingGateway.WatchedServices = nil
snap.TerminatingGateway.WatchedIntentions = nil
@ -528,9 +537,12 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
snap.MeshGateway.WatchedGateways = nil
snap.MeshGateway.WatchedServices = nil
case structs.ServiceKindIngressGateway:
// common with connect-proxy and ingress-gateway
snap.IngressGateway.WatchedUpstreams = nil
snap.IngressGateway.WatchedGateways = nil
snap.IngressGateway.WatchedDiscoveryChains = nil
snap.IngressGateway.WatchedPeerTrustBundles = nil
// only ingress-gateway
snap.IngressGateway.LeafCertWatchCancel = nil
}
@ -585,3 +597,44 @@ func (s *ConfigSnapshot) MeshConfigTLSOutgoing() *structs.MeshDirectionalTLSConf
}
return mesh.TLS.Outgoing
}
func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.PeeringServiceMeta {
nodes := u.PeerUpstreamEndpoints[uid]
if len(nodes) == 0 {
return structs.PeeringServiceMeta{}
}
// In agent/rpc/peering/subscription_manager.go we denormalize the
// PeeringServiceMeta data onto each replicated service instance to convey
// this information back to the importing side of the peering.
//
// This data is guaranteed (subject to any eventual consistency lag around
// updates) to be the same across all instances, so we only need to take
// the first item.
//
// TODO(peering): consider replicating this "common to all instances" data
// using a different replication type and persist it separately in the
// catalog to avoid this weird construction.
csn := nodes[0]
if csn.Service == nil {
return structs.PeeringServiceMeta{}
}
return *csn.Service.Connect.PeerMeta
}
func (u *ConfigSnapshotUpstreams) PeeredUpstreamIDs() []UpstreamID {
out := make([]UpstreamID, 0, len(u.UpstreamConfig))
for uid := range u.UpstreamConfig {
if uid.Peer == "" {
continue
}
if _, ok := u.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok {
// The trust bundle for this upstream is not available yet, skip for now.
continue
}
out = append(out, uid)
}
return out
}

View File

@ -36,6 +36,7 @@ const (
serviceResolverIDPrefix = "service-resolver:"
serviceIntentionsIDPrefix = "service-intentions:"
intentionUpstreamsID = "intention-upstreams"
upstreamPeerWatchIDPrefix = "upstream-peer:"
meshConfigEntryID = "mesh"
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"

View File

@ -299,10 +299,15 @@ func genVerifyGatewayWatch(expectedDatacenter string) verifyWatchRequest {
}
func genVerifyServiceSpecificRequest(expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest {
return genVerifyServiceSpecificPeeredRequest(expectedService, expectedFilter, expectedDatacenter, "", connect)
}
func genVerifyServiceSpecificPeeredRequest(expectedService, expectedFilter, expectedDatacenter, expectedPeer string, connect bool) verifyWatchRequest {
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ServiceSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
require.Equal(t, expectedPeer, reqReal.PeerName)
require.Equal(t, expectedService, reqReal.ServiceName)
require.Equal(t, expectedFilter, reqReal.QueryOptions.Filter)
require.Equal(t, connect, reqReal.Connect)
@ -396,6 +401,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"
rootWatchEvent := func() UpdateEvent {
return UpdateEvent{
CorrelationID: rootsWatchID,
@ -2500,29 +2507,28 @@ func TestState_WatchesAndUpdates(t *testing.T) {
EvaluateInPartition: "default",
Datacenter: "dc1",
}),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("web"),
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
// No Peering watch
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("web"),
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "should not be valid")
require.True(t, snap.MeshGateway.isEmpty())
// Even though there were no events to trigger the watches,
// the peered upstream is written to the maps
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.NotNil(t, snap.ConnectProxy.DiscoveryChain[extApiUID])
require.Len(t, snap.ConnectProxy.WatchedDiscoveryChains, 1, "%+v", snap.ConnectProxy.WatchedDiscoveryChains)
require.NotNil(t, snap.ConnectProxy.WatchedDiscoveryChains[extApiUID])
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 0, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedDiscoveryChains, 0, "%+v", snap.ConnectProxy.WatchedDiscoveryChains)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 0, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 0, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 0, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedPeerTrustBundles)
require.Len(t, snap.ConnectProxy.PeerTrustBundles, 0, "%+v", snap.ConnectProxy.PeerTrustBundles)
require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
require.Len(t, snap.ConnectProxy.PeeringTrustBundles, 0, "%+v", snap.ConnectProxy.PeeringTrustBundles)
@ -2564,6 +2570,36 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Bundle: peerTrustBundles.Bundles[0],
},
},
{
CorrelationID: upstreamPeerWatchIDPrefix + extApiUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
PeerName: "peer-a",
},
Service: &structs.NodeService{
ID: "api-a-1",
Service: "api-a",
PeerName: "peer-a",
Connect: structs.ServiceConnect{
PeerMeta: &structs.PeeringServiceMeta{
SNI: []string{
"payments.default.default.cloud.external." + peerTrustDomain,
},
SpiffeID: []string{
"spiffe://" + peerTrustDomain + "/ns/default/dc/cloud-dc/svc/payments",
},
Protocol: "tcp",
},
},
},
},
},
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
@ -2573,15 +2609,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles, snap.ConnectProxy.PeeringTrustBundles)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 2, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 2, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 2, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 2, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedPeerTrustBundles)
require.Equal(t, peerTrustBundles.Bundles[0], snap.ConnectProxy.PeerTrustBundles["peer-a"], "%+v", snap.ConnectProxy.WatchedPeerTrustBundles)
require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints)
require.NotNil(t, snap.ConnectProxy.PeerUpstreamEndpoints[extApiUID])
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
},

View File

@ -24,6 +24,8 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot {
refundsUID = NewUpstreamID(&refundsUpstream)
)
const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"
return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Upstreams = structs.Upstreams{
paymentsUpstream,
@ -37,7 +39,7 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot {
},
},
{
CorrelationID: "upstream-target:payments.default.default.dc1:" + paymentsUID.String(),
CorrelationID: upstreamPeerWatchIDPrefix + paymentsUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: []structs.CheckServiceNode{
{
@ -51,7 +53,13 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot {
Port: 443,
Connect: structs.ServiceConnect{
PeerMeta: &structs.PeeringServiceMeta{
SpiffeID: []string{"spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cloud-dc/svc/payments"},
SNI: []string{
"payments.default.default.cloud.external." + peerTrustDomain,
},
SpiffeID: []string{
"spiffe://" + peerTrustDomain + "/ns/default/dc/cloud-dc/svc/payments",
},
Protocol: "tcp",
},
},
},
@ -60,7 +68,7 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot {
},
},
{
CorrelationID: "upstream-target:refunds.default.default.dc1:" + refundsUID.String(),
CorrelationID: upstreamPeerWatchIDPrefix + refundsUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: []structs.CheckServiceNode{
{
@ -74,7 +82,13 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot {
Port: 443,
Connect: structs.ServiceConnect{
PeerMeta: &structs.PeeringServiceMeta{
SpiffeID: []string{"spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cloud-dc/svc/refunds"},
SNI: []string{
"refunds.default.default.cloud.external." + peerTrustDomain,
},
SpiffeID: []string{
"spiffe://" + peerTrustDomain + "/ns/default/dc/cloud-dc/svc/refunds",
},
Protocol: "tcp",
},
},
},

View File

@ -88,6 +88,23 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
return err
}
case strings.HasPrefix(u.CorrelationID, upstreamPeerWatchIDPrefix):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
uidString := strings.TrimPrefix(u.CorrelationID, upstreamPeerWatchIDPrefix)
uid := UpstreamIDFromString(uidString)
upstreamsSnapshot.PeerUpstreamEndpoints[uid] = resp.Nodes
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
return nil
}
s.logger.Warn("skipping transparent proxy update for peered upstream")
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {

View File

@ -208,8 +208,17 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
Datacenter: m.config.Datacenter,
Service: sn.Name,
}
sni := connect.PeeredServiceSNI(
sn.Name,
sn.NamespaceOrDefault(),
sn.PartitionOrDefault(),
state.peerName,
m.trustDomain,
)
peerMeta := &pbservice.PeeringServiceMeta{
SNI: []string{sni},
SpiffeID: []string{spiffeID.URI().String()},
Protocol: "tcp",
}
// skip checks since we just generated one from scratch

View File

@ -1253,6 +1253,13 @@ type PeeringServiceMeta struct {
Protocol string `json:",omitempty"`
}
func (m *PeeringServiceMeta) PrimarySNI() string {
if m == nil || len(m.SNI) == 0 {
return ""
}
return m.SNI[0]
}
func (ns *NodeService) BestAddress(wan bool) (string, int) {
addr := ns.Address
port := ns.Port

View File

@ -88,10 +88,6 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
if _, ok := cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok {
// The trust bundle for this upstream is not available yet, skip for now.
continue
}
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid]
if !ok {
@ -109,6 +105,29 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
}
}
// NOTE: Any time we skip an upstream below we MUST also skip that same
// upstream in endpoints.go so that the sets of endpoints generated matches
// the sets of clusters.
//
// TODO(peering): make this work for tproxy
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Not associated with a known explicit or implicit upstream so it is skipped.
continue
}
peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid)
upstreamCluster, err := s.makeUpstreamClusterForPeerService(upstreamCfg, peerMeta, cfgSnap)
if err != nil {
return nil, err
}
clusters = append(clusters, upstreamCluster)
}
for _, u := range cfgSnap.Proxy.Upstreams {
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
continue
@ -525,6 +544,96 @@ func (s *ResourceGenerator) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, nam
return c, err
}
func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
upstream *structs.Upstream,
peerMeta structs.PeeringServiceMeta,
cfgSnap *proxycfg.ConfigSnapshot,
) (*envoy_cluster_v3.Cluster, error) {
var (
c *envoy_cluster_v3.Cluster
err error
)
uid := proxycfg.NewUpstreamID(upstream)
cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstream, peerMeta)
if cfg.EnvoyClusterJSON != "" {
c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil {
return c, err
}
// In the happy path don't return yet as we need to inject TLS config still.
}
// TODO(peering): if we replicated service metadata separately from the
// instances we wouldn't have to flip/flop this cluster name like this.
clusterName := peerMeta.PrimarySNI()
if clusterName == "" {
clusterName = uid.EnvoyID()
}
s.Logger.Trace("generating cluster for", "cluster", clusterName)
if c == nil {
c = &envoy_cluster_v3.Cluster{
Name: clusterName,
AltStatName: clusterName,
ConnectTimeout: durationpb.New(time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond),
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{
Value: 0, // disable panic threshold
},
},
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core_v3.ConfigSource{
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
Ads: &envoy_core_v3.AggregatedConfigSource{},
},
},
},
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
},
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
}
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
if err := s.setHttp2ProtocolOptions(c); err != nil {
return c, err
}
}
}
rootPEMs := cfgSnap.RootPEMs()
if uid.Peer != "" {
rootPEMs = cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer].ConcatenatedRootPEMs()
}
// Enable TLS upstream with the configured client certificate.
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)
err = injectSANMatcher(commonTLSContext, peerMeta.SpiffeID...)
if err != nil {
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", clusterName, err)
}
tlsContext := &envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: peerMeta.PrimarySNI(),
}
transportSocket, err := makeUpstreamTLSTransportSocket(tlsContext)
if err != nil {
return nil, err
}
c.TransportSocket = transportSocket
return c, nil
}
func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy_cluster_v3.Cluster, error) {
var c *envoy_cluster_v3.Cluster
var err error
@ -751,7 +860,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
}
sort.Strings(spiffeIDs)
s.Logger.Debug("generating cluster for", "cluster", clusterName)
s.Logger.Trace("generating cluster for", "cluster", clusterName)
c := &envoy_cluster_v3.Cluster{
Name: clusterName,
AltStatName: clusterName,

View File

@ -45,8 +45,11 @@ func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
// (upstream instances) in the snapshot.
func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
// TODO: this estimate is wrong
resources := make([]proto.Message, 0,
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+
len(cfgSnap.ConnectProxy.PeerUpstreamEndpoints)+
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
// so that the sets of endpoints generated matches the sets of clusters.
@ -58,10 +61,6 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
if _, ok := cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok {
// The trust bundle for this upstream is not available yet, skip for now.
continue
}
es := s.endpointsFromDiscoveryChain(
uid,
@ -74,6 +73,42 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
resources = append(resources, es...)
}
// NOTE: Any time we skip an upstream below we MUST also skip that same
// upstream in clusters.go so that the sets of endpoints generated matches
// the sets of clusters.
//
// TODO(peering): make this work for tproxy
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Not associated with a known explicit or implicit upstream so it is skipped.
continue
}
peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid)
// TODO(peering): if we replicated service metadata separately from the
// instances we wouldn't have to flip/flop this cluster name like this.
clusterName := peerMeta.PrimarySNI()
if clusterName == "" {
clusterName = uid.EnvoyID()
}
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid]
if ok {
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Locality,
)
resources = append(resources, la)
}
}
// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains
for _, u := range cfgSnap.Proxy.Upstreams {
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {

View File

@ -11,9 +11,6 @@ import (
"strings"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
@ -22,7 +19,6 @@ import (
envoy_http_router_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
envoy_original_dst_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/original_dst/v3"
envoy_tls_inspector_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3"
envoy_connection_limit_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/connection_limit/v3"
envoy_http_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoy_sni_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/sni_cluster/v3"
@ -35,6 +31,8 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
@ -226,6 +224,73 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}
}
// Looping over explicit upstreams is only needed for cross-peer because
// they do not have discovery chains.
//
// TODO(peering): make this work for tproxy
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Not associated with a known explicit or implicit upstream so it is skipped.
continue
}
peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid)
cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstreamCfg, peerMeta)
// If escape hatch is present, create a listener from it and move on to the next
if cfg.EnvoyListenerJSON != "" {
upstreamListener, err := makeListenerFromUserConfig(cfg.EnvoyListenerJSON)
if err != nil {
s.Logger.Error("failed to parse envoy_listener_json",
"upstream", uid,
"error", err)
continue
}
resources = append(resources, upstreamListener)
continue
}
// TODO(peering): if we replicated service metadata separately from the
// instances we wouldn't have to flip/flop this cluster name like this.
clusterName := peerMeta.PrimarySNI()
if clusterName == "" {
clusterName = uid.EnvoyID()
}
// Generate the upstream listeners for when they are explicitly set with a local bind port or socket path
if upstreamCfg != nil && upstreamCfg.HasLocalPortOrSocket() {
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
clusterName: clusterName,
filterName: uid.EnvoyID(),
routeName: uid.EnvoyID(),
protocol: cfg.Protocol,
useRDS: false,
})
if err != nil {
return nil, err
}
upstreamListener := makeListener(uid.EnvoyID(), upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
resources = append(resources, upstreamListener)
// Avoid creating filter chains below for upstreams that have dedicated listeners
continue
}
// The rest of this loop is used exclusively for transparent proxies.
// Below we create a filter chain per upstream, rather than a listener per upstream
// as we do for explicit upstreams above.
// TODO(peering): tproxy
}
if outboundListener != nil {
// Add a passthrough for every mesh endpoint that can be dialed directly,
// as opposed to via a virtual IP.
@ -1486,6 +1551,46 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(
return cfg
}
func (s *ResourceGenerator) getAndModifyUpstreamConfigForPeeredListener(
uid proxycfg.UpstreamID,
u *structs.Upstream,
peerMeta structs.PeeringServiceMeta,
) structs.UpstreamConfig {
var (
cfg structs.UpstreamConfig
err error
)
configMap := make(map[string]interface{})
if u != nil {
configMap = u.Config
}
cfg, err = structs.ParseUpstreamConfigNoDefaults(configMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
}
protocol := cfg.Protocol
if protocol == "" {
protocol = peerMeta.Protocol
}
if protocol == "" {
protocol = "tcp"
}
// set back on the config so that we can use it from return value
cfg.Protocol = protocol
if cfg.ConnectTimeoutMs == 0 {
cfg.ConnectTimeoutMs = 5000
}
return cfg
}
type listenerFilterOpts struct {
useRDS bool
protocol string

View File

@ -28,8 +28,8 @@
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "payments.default.dc1.internal.trustdomain.consul",
"altStatName": "payments.default.dc1.internal.trustdomain.consul",
"name": "payments.default.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"altStatName": "payments.default.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
@ -80,14 +80,14 @@
]
}
},
"sni": "payments.default.dc1.internal.trustdomain.consul"
"sni": "payments.default.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "refunds.default.dc1.internal.trustdomain.consul",
"altStatName": "refunds.default.dc1.internal.trustdomain.consul",
"name": "refunds.default.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"altStatName": "refunds.default.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
@ -138,7 +138,7 @@
]
}
},
"sni": "refunds.default.dc1.internal.trustdomain.consul"
"sni": "refunds.default.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
}

View File

@ -113,6 +113,8 @@ func MakePluginConfiguration(cfgSnap *proxycfg.ConfigSnapshot) PluginConfigurati
}
}
// TODO(peering): consider PeerUpstreamEndpoints in addition to DiscoveryChain
for uid, dc := range cfgSnap.ConnectProxy.DiscoveryChain {
if _, ok := connectProxies[uid]; !ok {
continue