From 74ca6406ea7569ad42e5c8ffb575b42e3e524238 Mon Sep 17 00:00:00 2001 From: Freddy Date: Wed, 1 Jun 2022 15:53:52 -0600 Subject: [PATCH] Configure upstream TLS context with peer root certs (#13321) For mTLS to work between two proxies in peered clusters with different root CAs, proxies need to configure their outbound listener to use different root certificates for validation. Up until peering was introduced proxies would only ever use one set of root certificates to validate all mesh traffic, both inbound and outbound. Now an upstream proxy may have a leaf certificate signed by a CA that's different from the dialing proxy's. This PR makes changes to proxycfg and xds so that the upstream TLS validation uses different root certificates depending on which cluster is being dialed. --- agent/agent.go | 5 +- .../mock_TrustBundleReader_test.go | 60 ++++++++++ agent/cache-types/trust_bundle.go | 51 +++++++++ agent/cache-types/trust_bundle_test.go | 104 ++++++++++++++++++ agent/connect/ca/common.go | 13 --- agent/connect/ca/provider_aws.go | 13 ++- agent/connect/ca/provider_vault.go | 15 +-- agent/consul/leader_connect_ca.go | 3 +- agent/consul/leader_connect_ca_test.go | 9 +- agent/consul/server_connect.go | 4 +- agent/proxycfg-glue/glue.go | 5 + agent/proxycfg/connect_proxy.go | 28 +++++ agent/proxycfg/data_sources.go | 25 +++++ agent/proxycfg/manager_test.go | 13 ++- agent/proxycfg/snapshot.go | 21 ++++ agent/proxycfg/state.go | 1 + agent/proxycfg/state_test.go | 27 ++++- agent/proxycfg/testing.go | 56 +++++++++- agent/proxycfg/testing_peering.go | 9 +- agent/xds/clusters.go | 24 ++-- agent/xds/endpoints.go | 6 + agent/xds/listeners.go | 29 ++--- agent/xds/listeners_ingress.go | 2 +- ...-proxy-with-peered-upstreams.latest.golden | 6 +- lib/strings.go | 18 +++ proto/pbpeering/peering.go | 47 ++++++++ 26 files changed, 521 insertions(+), 73 deletions(-) create mode 100644 agent/cache-types/mock_TrustBundleReader_test.go create mode 100644 agent/cache-types/trust_bundle.go create mode 100644 agent/cache-types/trust_bundle_test.go create mode 100644 lib/strings.go diff --git a/agent/agent.go b/agent/agent.go index 6b0911ba5b..cb6ba1a940 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -645,6 +645,7 @@ func (a *Agent) Start(ctx context.Context) error { PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache), ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache), ServiceList: proxycfgglue.CacheServiceList(a.cache), + TrustBundle: proxycfgglue.CacheTrustBundle(a.cache), } a.fillEnterpriseProxyDataSources(&proxyDataSources) a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ @@ -3819,7 +3820,7 @@ func (a *Agent) reloadConfig(autoReload bool) error { // breaking some existing behavior. newCfg.NodeID = a.config.NodeID - //if auto reload is enabled, make sure we have the right certs file watched. + // if auto reload is enabled, make sure we have the right certs file watched. if autoReload { for _, f := range []struct { oldCfg tlsutil.ProtocolConfig @@ -4097,6 +4098,8 @@ func (a *Agent) registerCache() { a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{Agent: a}) + a.cache.RegisterType(cachetype.TrustBundleReadName, &cachetype.TrustBundle{Client: a.rpcClientPeering}) + a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, &cachetype.FederationStateListMeshGateways{RPC: a}) diff --git a/agent/cache-types/mock_TrustBundleReader_test.go b/agent/cache-types/mock_TrustBundleReader_test.go new file mode 100644 index 0000000000..7ea636b3d7 --- /dev/null +++ b/agent/cache-types/mock_TrustBundleReader_test.go @@ -0,0 +1,60 @@ +// Code generated by mockery v2.12.2. DO NOT EDIT. + +package cachetype + +import ( + context "context" + + grpc "google.golang.org/grpc" + + mock "github.com/stretchr/testify/mock" + + pbpeering "github.com/hashicorp/consul/proto/pbpeering" + + testing "testing" +) + +// MockTrustBundleReader is an autogenerated mock type for the TrustBundleReader type +type MockTrustBundleReader struct { + mock.Mock +} + +// TrustBundleRead provides a mock function with given fields: ctx, in, opts +func (_m *MockTrustBundleReader) TrustBundleRead(ctx context.Context, in *pbpeering.TrustBundleReadRequest, opts ...grpc.CallOption) (*pbpeering.TrustBundleReadResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *pbpeering.TrustBundleReadResponse + if rf, ok := ret.Get(0).(func(context.Context, *pbpeering.TrustBundleReadRequest, ...grpc.CallOption) *pbpeering.TrustBundleReadResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pbpeering.TrustBundleReadResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *pbpeering.TrustBundleReadRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewMockTrustBundleReader creates a new instance of MockTrustBundleReader. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTrustBundleReader(t testing.TB) *MockTrustBundleReader { + mock := &MockTrustBundleReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/cache-types/trust_bundle.go b/agent/cache-types/trust_bundle.go new file mode 100644 index 0000000000..6539a1ae87 --- /dev/null +++ b/agent/cache-types/trust_bundle.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/proto/pbpeering" + "google.golang.org/grpc" +) + +// Recommended name for registration. +const TrustBundleReadName = "peer-trust-bundle" + +// TrustBundle supports fetching discovering service instances via prepared +// queries. +type TrustBundle struct { + RegisterOptionsNoRefresh + Client TrustBundleReader +} + +//go:generate mockery --name TrustBundleReader --inpackage --testonly +type TrustBundleReader interface { + TrustBundleRead( + ctx context.Context, in *pbpeering.TrustBundleReadRequest, opts ...grpc.CallOption, + ) (*pbpeering.TrustBundleReadResponse, error) +} + +func (t *TrustBundle) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a TrustBundleReadRequest. + // We do not need to make a copy of this request type like in other cache types + // because the RequestInfo is synthetic. + reqReal, ok := req.(*pbpeering.TrustBundleReadRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Fetch + reply, err := t.Client.TrustBundleRead(context.Background(), reqReal) + if err != nil { + return result, err + } + + result.Value = reply + result.Index = reply.Index + + return result, nil +} diff --git a/agent/cache-types/trust_bundle_test.go b/agent/cache-types/trust_bundle_test.go new file mode 100644 index 0000000000..8e18e69fe7 --- /dev/null +++ b/agent/cache-types/trust_bundle_test.go @@ -0,0 +1,104 @@ +package cachetype + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/proto/pbpeering" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestTrustBundles(t *testing.T) { + client := NewMockTrustBundleReader(t) + typ := &TrustBundle{Client: client} + + resp := &pbpeering.TrustBundleReadResponse{ + Index: 48, + Bundle: &pbpeering.PeeringTrustBundle{ + PeerName: "peer1", + RootPEMs: []string{"peer1-roots"}, + }, + } + + // Expect the proper call. + // This also returns the canned response above. + client.On("TrustBundleRead", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbpeering.TrustBundleReadRequest) + require.Equal(t, "foo", req.Name) + }). + Return(resp, nil) + + // Fetch and assert against the result. + result, err := typ.Fetch(cache.FetchOptions{}, &pbpeering.TrustBundleReadRequest{ + Name: "foo", + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, result) +} + +func TestTrustBundles_badReqType(t *testing.T) { + client := pbpeering.NewPeeringServiceClient(nil) + typ := &TrustBundle{Client: client} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") +} + +// This test asserts that we can continuously poll this cache type, given that it doesn't support blocking. +func TestTrustBundles_MultipleUpdates(t *testing.T) { + c := cache.New(cache.Options{}) + + client := NewMockTrustBundleReader(t) + + // On each mock client call to TrustBundleList by service we will increment the index by 1 + // to simulate new data arriving. + resp := &pbpeering.TrustBundleReadResponse{ + Index: uint64(0), + } + + client.On("TrustBundleRead", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbpeering.TrustBundleReadRequest) + require.Equal(t, "foo", req.Name) + + // Increment on each call. + resp.Index++ + }). + Return(resp, nil) + + c.RegisterType(TrustBundleReadName, &TrustBundle{Client: client}) + + ch := make(chan cache.UpdateEvent) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + err := c.Notify(ctx, TrustBundleReadName, &pbpeering.TrustBundleReadRequest{Name: "foo"}, "updates", ch) + require.NoError(t, err) + + i := uint64(1) + for { + select { + case <-ctx.Done(): + return + case update := <-ch: + // Expect to receive updates for increasing indexes serially. + resp := update.Result.(*pbpeering.TrustBundleReadResponse) + require.Equal(t, i, resp.Index) + i++ + + if i > 3 { + return + } + } + } +} diff --git a/agent/connect/ca/common.go b/agent/connect/ca/common.go index cef412bd3e..848a4fa7b1 100644 --- a/agent/connect/ca/common.go +++ b/agent/connect/ca/common.go @@ -4,7 +4,6 @@ import ( "bytes" "crypto/x509" "fmt" - "strings" "github.com/hashicorp/consul/agent/connect" ) @@ -92,15 +91,3 @@ func validateSignIntermediate(csr *x509.CertificateRequest, spiffeID *connect.Sp } return nil } - -// EnsureTrailingNewline this is used to fix a case where the provider do not return a new line after -// the certificate as per the specification see GH-8178 for more context -func EnsureTrailingNewline(cert string) string { - if cert == "" { - return cert - } - if strings.HasSuffix(cert, "\n") { - return cert - } - return fmt.Sprintf("%s\n", cert) -} diff --git a/agent/connect/ca/provider_aws.go b/agent/connect/ca/provider_aws.go index 25786ab409..cef0c7ddbe 100644 --- a/agent/connect/ca/provider_aws.go +++ b/agent/connect/ca/provider_aws.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" ) const ( @@ -363,15 +364,15 @@ func (a *AWSProvider) loadCACerts() error { if a.isPrimary { // Just use the cert as a root - a.rootPEM = EnsureTrailingNewline(*output.Certificate) + a.rootPEM = lib.EnsureTrailingNewline(*output.Certificate) } else { - a.intermediatePEM = EnsureTrailingNewline(*output.Certificate) + a.intermediatePEM = lib.EnsureTrailingNewline(*output.Certificate) // TODO(banks) support user-supplied CA being a Subordinate even in the // primary DC. For now this assumes there is only one cert in the chain if output.CertificateChain == nil { return fmt.Errorf("Subordinate CA %s returned no chain", a.arn) } - a.rootPEM = EnsureTrailingNewline(*output.CertificateChain) + a.rootPEM = lib.EnsureTrailingNewline(*output.CertificateChain) } return nil } @@ -489,7 +490,7 @@ func (a *AWSProvider) signCSR(csrPEM string, templateARN string, ttl time.Durati } if certOutput.Certificate != nil { - return true, EnsureTrailingNewline(*certOutput.Certificate), nil + return true, lib.EnsureTrailingNewline(*certOutput.Certificate), nil } return false, "", nil @@ -532,8 +533,8 @@ func (a *AWSProvider) SetIntermediate(intermediatePEM string, rootPEM string) er } // We successfully initialized, keep track of the root and intermediate certs. - a.rootPEM = EnsureTrailingNewline(rootPEM) - a.intermediatePEM = EnsureTrailingNewline(intermediatePEM) + a.rootPEM = lib.EnsureTrailingNewline(rootPEM) + a.intermediatePEM = lib.EnsureTrailingNewline(intermediatePEM) return nil } diff --git a/agent/connect/ca/provider_vault.go b/agent/connect/ca/provider_vault.go index 5c0c76608e..270d53a019 100644 --- a/agent/connect/ca/provider_vault.go +++ b/agent/connect/ca/provider_vault.go @@ -13,14 +13,15 @@ import ( "sync" "time" - "github.com/hashicorp/consul/lib/decode" - "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/go-hclog" vaultapi "github.com/hashicorp/vault/api" "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/lib/decode" + "github.com/hashicorp/consul/lib/retry" ) const ( @@ -506,7 +507,7 @@ func (v *VaultProvider) getCA(namespace, path string) (string, error) { return "", err } - root := EnsureTrailingNewline(string(bytes)) + root := lib.EnsureTrailingNewline(string(bytes)) if root == "" { return "", ErrBackendNotInitialized } @@ -535,7 +536,7 @@ func (v *VaultProvider) getCAChain(namespace, path string) (string, error) { return "", err } - root := EnsureTrailingNewline(string(raw)) + root := lib.EnsureTrailingNewline(string(raw)) return root, nil } @@ -600,7 +601,7 @@ func (v *VaultProvider) Sign(csr *x509.CertificateRequest) (string, error) { if !ok { return "", fmt.Errorf("certificate was not a string") } - return EnsureTrailingNewline(cert), nil + return lib.EnsureTrailingNewline(cert), nil } // SignIntermediate returns a signed CA certificate with a path length constraint @@ -637,7 +638,7 @@ func (v *VaultProvider) SignIntermediate(csr *x509.CertificateRequest) (string, return "", fmt.Errorf("signed intermediate result is not a string") } - return EnsureTrailingNewline(intermediate), nil + return lib.EnsureTrailingNewline(intermediate), nil } // CrossSignCA takes a CA certificate and cross-signs it to form a trust chain @@ -677,7 +678,7 @@ func (v *VaultProvider) CrossSignCA(cert *x509.Certificate) (string, error) { return "", fmt.Errorf("certificate was not a string") } - return EnsureTrailingNewline(xcCert), nil + return lib.EnsureTrailingNewline(xcCert), nil } // SupportsCrossSigning implements Provider diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 2239bc6fd4..53b1856789 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/routine" ) @@ -1522,7 +1523,7 @@ func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID conne // Append any intermediates needed by this root. for _, p := range caRoot.IntermediateCerts { - pem = pem + ca.EnsureTrailingNewline(p) + pem = pem + lib.EnsureTrailingNewline(p) } modIdx, err := c.delegate.ApplyCALeafRequest() diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index e30ded8edd..37756eb204 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -1001,7 +1002,7 @@ func generateExternalRootCA(t *testing.T, client *vaultapi.Client) string { "ttl": "2400h", }) require.NoError(t, err, "failed to generate root") - return ca.EnsureTrailingNewline(resp.Data["certificate"].(string)) + return lib.EnsureTrailingNewline(resp.Data["certificate"].(string)) } func setupPrimaryCA(t *testing.T, client *vaultapi.Client, path string, rootPEM string) string { @@ -1033,12 +1034,12 @@ func setupPrimaryCA(t *testing.T, client *vaultapi.Client, path string, rootPEM require.NoError(t, err, "failed to sign intermediate") var buf strings.Builder - buf.WriteString(ca.EnsureTrailingNewline(intermediate.Data["certificate"].(string))) - buf.WriteString(ca.EnsureTrailingNewline(rootPEM)) + buf.WriteString(lib.EnsureTrailingNewline(intermediate.Data["certificate"].(string))) + buf.WriteString(lib.EnsureTrailingNewline(rootPEM)) _, err = client.Logical().Write(path+"/intermediate/set-signed", map[string]interface{}{ "certificate": buf.String(), }) require.NoError(t, err, "failed to set signed intermediate") - return ca.EnsureTrailingNewline(buf.String()) + return lib.EnsureTrailingNewline(buf.String()) } diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index 5010eda7fe..9193604d6a 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -6,9 +6,9 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/agent/connect" - "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" ) func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) { @@ -59,7 +59,7 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind ExternalTrustDomain: r.ExternalTrustDomain, NotBefore: r.NotBefore, NotAfter: r.NotAfter, - RootCert: ca.EnsureTrailingNewline(r.RootCert), + RootCert: lib.EnsureTrailingNewline(r.RootCert), IntermediateCerts: intermediates, RaftIndex: r.RaftIndex, Active: r.Active, diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 7dd7a8846a..b18c6487f4 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) // CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from @@ -100,6 +101,10 @@ func CacheServiceList(c *cache.Cache) proxycfg.ServiceList { return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.CatalogServiceListName} } +func CacheTrustBundle(c *cache.Cache) proxycfg.TrustBundle { + return &cacheProxyDataSource[*pbpeering.TrustBundleReadRequest]{c, cachetype.TrustBundleReadName} +} + // cacheProxyDataSource implements a generic wrapper around the agent cache to // provide data to the proxycfg.Manager. type cacheProxyDataSource[ReqType cache.Request] struct { diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 8138a0ac00..923b15b73b 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) type handlerConnectProxy struct { @@ -23,6 +24,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) + snap.ConnectProxy.WatchedPeerTrustBundles = make(map[string]context.CancelFunc) + snap.ConnectProxy.PeerTrustBundles = make(map[string]*pbpeering.PeeringTrustBundle) snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) @@ -193,6 +196,20 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e if err := (*handlerUpstreams)(s).resetWatchesFromChain(ctx, uid, chain, &snap.ConnectProxy.ConfigSnapshotUpstreams); err != nil { return snap, fmt.Errorf("error while resetting watches from chain: %w", err) } + + // Check whether a watch for this peer exists to avoid duplicates. + if _, ok := snap.ConnectProxy.WatchedPeerTrustBundles[uid.Peer]; !ok { + peerCtx, cancel := context.WithCancel(ctx) + if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{ + Name: uid.Peer, + Partition: uid.PartitionOrDefault(), + }, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil { + cancel() + return snap, fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err) + } + + snap.ConnectProxy.WatchedPeerTrustBundles[uid.Peer] = cancel + } continue } @@ -231,6 +248,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s return fmt.Errorf("invalid type for response: %T", u.Result) } snap.Roots = roots + + case strings.HasPrefix(u.CorrelationID, peerTrustBundleIDPrefix): + resp, ok := u.Result.(*pbpeering.TrustBundleReadResponse) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix) + if resp.Bundle != nil { + snap.ConnectProxy.PeerTrustBundles[peer] = resp.Bundle + } + case u.CorrelationID == intentionsWatchID: resp, ok := u.Result.(*structs.IndexedIntentionMatches) if !ok { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index 694c3cbf10..1f48c15d89 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -5,6 +5,7 @@ import ( cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) // UpdateEvent contains new data for a resource we are subscribed to (e.g. an @@ -21,48 +22,66 @@ type DataSources struct { // CARoots provides updates about the CA root certificates on a notification // channel. CARoots CARoots + // CompiledDiscoveryChain provides updates about a service's discovery chain // on a notification channel. CompiledDiscoveryChain CompiledDiscoveryChain + // ConfigEntry provides updates about a single config entry on a notification // channel. ConfigEntry ConfigEntry + // ConfigEntryList provides updates about a list of config entries on a // notification channel. ConfigEntryList ConfigEntryList + // Datacenters provides updates about federated datacenters on a notification // channel. Datacenters Datacenters + // FederationStateListMeshGateways is the interface used to consume updates // about mesh gateways from the federation state. FederationStateListMeshGateways FederationStateListMeshGateways + // GatewayServices provides updates about a gateway's upstream services on a // notification channel. GatewayServices GatewayServices + // Health provides service health updates on a notification channel. Health Health + // HTTPChecks provides updates about a service's HTTP and gRPC checks on a // notification channel. HTTPChecks HTTPChecks + // Intentions provides intention updates on a notification channel. Intentions Intentions + // IntentionUpstreams provides intention-inferred upstream updates on a // notification channel. IntentionUpstreams IntentionUpstreams + // InternalServiceDump provides updates about a (gateway) service on a // notification channel. InternalServiceDump InternalServiceDump + // LeafCertificate provides updates about the service's leaf certificate on a // notification channel. LeafCertificate LeafCertificate + // PreparedQuery provides updates about the results of a prepared query. PreparedQuery PreparedQuery + // ResolvedServiceConfig provides updates about a service's resolved config. ResolvedServiceConfig ResolvedServiceConfig + // ServiceList provides updates about the list of all services in a datacenter // on a notification channel. ServiceList ServiceList + // TrustBundle provides updates about the trust bundle for a single peer. + TrustBundle TrustBundle + DataSourcesEnterprise } @@ -160,3 +179,9 @@ type ResolvedServiceConfig interface { type ServiceList interface { Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error } + +// TrustBundle is the interface used to consume updates about a single +// peer's trust bundle. +type TrustBundle interface { + Notify(ctx context.Context, req *pbpeering.TrustBundleReadRequest, correlationID string, ch chan<- UpdateEvent) error +} diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 402b48bc82..5d511c59bb 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/mitchellh/copystructure" "github.com/stretchr/testify/require" @@ -238,8 +239,10 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[1]): &upstreams[1], NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, - PassthroughIndices: map[string]indexedTarget{}, + PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, + WatchedPeerTrustBundles: map[string]context.CancelFunc{}, + PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, @@ -298,8 +301,10 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[1]): &upstreams[1], NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, - PassthroughIndices: map[string]indexedTarget{}, + PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, + WatchedPeerTrustBundles: map[string]context.CancelFunc{}, + PeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 34e0bf6d3f..cd8afd2cef 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -6,6 +6,8 @@ import ( "sort" "strings" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" @@ -42,6 +44,14 @@ type ConfigSnapshotUpstreams struct { // endpoints of an upstream. WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes + // WatchedPeerTrustBundles is a map of (PeerName -> CancelFunc) in order to cancel + // watches for peer trust bundles any time the list of upstream peers changes. + WatchedPeerTrustBundles map[string]context.CancelFunc + + // PeerTrustBundles is a map of (PeerName -> PeeringTrustBundle). + // It is used to store trust bundles for upstream TLS transport sockets. + PeerTrustBundles map[string]*pbpeering.PeeringTrustBundle + // WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() -> // CancelFunc) in order to cancel watches for mesh gateways WatchedGateways map[UpstreamID]map[string]context.CancelFunc @@ -133,6 +143,8 @@ func (c *configSnapshotConnectProxy) isEmpty() bool { len(c.WatchedDiscoveryChains) == 0 && len(c.WatchedUpstreams) == 0 && len(c.WatchedUpstreamEndpoints) == 0 && + len(c.WatchedPeerTrustBundles) == 0 && + len(c.PeerTrustBundles) == 0 && len(c.WatchedGateways) == 0 && len(c.WatchedGatewayEndpoints) == 0 && len(c.WatchedServiceChecks) == 0 && @@ -532,6 +544,15 @@ func (s *ConfigSnapshot) Leaf() *structs.IssuedCert { } } +// RootPEMs returns all PEM-encoded public certificates for the root CA. +func (s *ConfigSnapshot) RootPEMs() string { + var rootPEMs string + for _, root := range s.Roots.Roots { + rootPEMs += lib.EnsureTrailingNewline(root.RootCert) + } + return rootPEMs +} + func (s *ConfigSnapshot) MeshConfig() *structs.MeshConfigEntry { switch s.Kind { case structs.ServiceKindConnectProxy: diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 4ac17ebd7f..de0eec2368 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -20,6 +20,7 @@ const ( coalesceTimeout = 200 * time.Millisecond rootsWatchID = "roots" leafWatchID = "leaf" + peerTrustBundleIDPrefix = "peer-trust-bundle:" intentionsWatchID = "intentions" serviceListWatchID = "service-list" federationStateListGatewaysWatchID = "federation-state-list-mesh-gateways" diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index a2fa5914fc..7ca93da661 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" @@ -132,6 +133,7 @@ func recordWatches(sc *stateConfig) *watchRecorder { PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr}, ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr}, ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, + TrustBundle: typedWatchRecorder[*pbpeering.TrustBundleReadRequest]{wr}, } recordWatchesEnterprise(sc, wr) @@ -193,6 +195,14 @@ func verifyDatacentersWatch(t testing.TB, request any) { require.True(t, ok) } +func genVerifyTrustBundleReadWatch(peer string) verifyWatchRequest { + return func(t testing.TB, request any) { + reqReal, ok := request.(*pbpeering.TrustBundleReadRequest) + require.True(t, ok) + require.Equal(t, peer, reqReal.Name) + } +} + func genVerifyLeafWatchWithDNSSANs(expectedService string, expectedDatacenter string, expectedDNSSANs []string) verifyWatchRequest { return func(t testing.TB, request any) { reqReal, ok := request.(*cachetype.ConnectCALeafRequest) @@ -359,6 +369,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { t.Parallel() indexedRoots, issuedCert := TestCerts(t) + peerTrustBundles := TestPeerTrustBundles(t) // Used to account for differences in OSS/ent implementations of ServiceID.String() var ( @@ -2479,8 +2490,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { EvaluateInPartition: "default", Datacenter: "dc1", }), - rootsWatchID: genVerifyDCSpecificWatch("dc1"), - leafWatchID: genVerifyLeafWatch("web", "dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), + peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"), // No Peering watch }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { @@ -2497,6 +2509,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints) require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways) require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) + require.Contains(t, snap.ConnectProxy.WatchedPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedPeerTrustBundles) + require.Len(t, snap.ConnectProxy.PeerTrustBundles, 0, "%+v", snap.ConnectProxy.PeerTrustBundles) require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) @@ -2527,6 +2541,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, Err: nil, }, + { + CorrelationID: peerTrustBundleIDPrefix + "peer-a", + Result: &pbpeering.TrustBundleReadResponse{ + Bundle: peerTrustBundles.Bundles[0], + }, + }, }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.True(t, snap.Valid()) @@ -2540,6 +2560,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.WatchedGateways, 2, "%+v", snap.ConnectProxy.WatchedGateways) require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 2, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) + require.Contains(t, snap.ConnectProxy.WatchedPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedPeerTrustBundles) + require.Equal(t, peerTrustBundles.Bundles[0], snap.ConnectProxy.PeerTrustBundles["peer-a"], "%+v", snap.ConnectProxy.WatchedPeerTrustBundles) + require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) }, diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 1edbfd0a3e..8f12ec8f39 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -20,8 +20,58 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbpeering" ) +func TestPeerTrustBundles(t testing.T) *pbpeering.TrustBundleListByServiceResponse { + t.Helper() + + return &pbpeering.TrustBundleListByServiceResponse{ + Bundles: []*pbpeering.PeeringTrustBundle{ + { + PeerName: "peer-a", + TrustDomain: "1c053652-8512-4373-90cf-5a7f6263a994.consul", + RootPEMs: []string{`-----BEGIN CERTIFICATE----- +MIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v +MRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B +CQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0 +NFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh +ZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl +ci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B +AQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV +c2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR +2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC +AwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk +yto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy +0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH +ZAuKN1aoKA== +-----END CERTIFICATE-----`}, + }, + { + PeerName: "peer-b", + TrustDomain: "d89ac423-e95a-475d-94f2-1c557c57bf31.consul", + RootPEMs: []string{`-----BEGIN CERTIFICATE----- +MIICcTCCAdoCCQDyGxC08cD0BDANBgkqhkiG9w0BAQsFADB9MQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExETAPBgNVBAcMCENhcmxzYmFkMQwwCgYDVQQKDANGb28x +EDAOBgNVBAsMB2V4YW1wbGUxDzANBgNVBAMMBnBlZXItYjEdMBsGCSqGSIb3DQEJ +ARYOZm9vQHBlZXItYi5jb20wHhcNMjIwNTI2MDExNjE2WhcNMjMwNTI2MDExNjE2 +WjB9MQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExETAPBgNVBAcMCENhcmxzYmFk +MQwwCgYDVQQKDANGb28xEDAOBgNVBAsMB2V4YW1wbGUxDzANBgNVBAMMBnBlZXIt +YjEdMBsGCSqGSIb3DQEJARYOZm9vQHBlZXItYi5jb20wgZ8wDQYJKoZIhvcNAQEB +BQADgY0AMIGJAoGBAL4i5erdZ5vKk3mzW9Qt6Wvw/WN/IpMDlL0a28wz9oDCtMLN +cD/XQB9yT5jUwb2s4mD1lCDZtee8MHeD8zygICozufWVB+u2KvMaoA50T9GMQD0E +z/0nz/Z703I4q13VHeTpltmEpYcfxw/7nJ3leKA34+Nj3zteJ70iqvD/TNBBAgMB +AAEwDQYJKoZIhvcNAQELBQADgYEAbL04gicH+EIznDNhZJEb1guMBtBBJ8kujPyU +ao8xhlUuorDTLwhLpkKsOhD8619oSS8KynjEBichidQRkwxIaze0a2mrGT+tGBMf +pVz6UeCkqpde6bSJ/ozEe/2seQzKqYvRT1oUjLwYvY7OIh2DzYibOAxh6fewYAmU +5j5qNLc= +-----END CERTIFICATE-----`}, + }, + }, + } +} + // TestCerts generates a CA and Leaf suitable for returning as mock CA // root/leaf cache requests. func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) { @@ -671,6 +721,7 @@ func testConfigSnapshotFixture( PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{}, ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{}, ServiceList: &noopDataSource[*structs.DCSpecificRequest]{}, + TrustBundle: &noopDataSource[*pbpeering.TrustBundleReadRequest]{}, }, dnsConfig: DNSConfig{ // TODO: make configurable Domain: "consul", @@ -870,6 +921,7 @@ func NewTestDataSources() *TestDataSources { PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](), ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](), ServiceList: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList](), + TrustBundle: NewTestDataSource[*pbpeering.TrustBundleReadRequest, *pbpeering.TrustBundleReadResponse](), } srcs.buildEnterpriseSources() return srcs @@ -892,8 +944,7 @@ type TestDataSources struct { PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse] ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse] ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList] - - TestDataSourcesEnterprise + TrustBundle *TestDataSource[*pbpeering.TrustBundleReadRequest, *pbpeering.TrustBundleReadResponse] } func (t *TestDataSources) ToDataSources() DataSources { @@ -913,6 +964,7 @@ func (t *TestDataSources) ToDataSources() DataSources { PreparedQuery: t.PreparedQuery, ResolvedServiceConfig: t.ResolvedServiceConfig, ServiceList: t.ServiceList, + TrustBundle: t.TrustBundle, } t.fillEnterpriseDataSources(&ds) return ds diff --git a/agent/proxycfg/testing_peering.go b/agent/proxycfg/testing_peering.go index 3b469f79d4..09043c3421 100644 --- a/agent/proxycfg/testing_peering.go +++ b/agent/proxycfg/testing_peering.go @@ -4,6 +4,7 @@ import ( "github.com/mitchellh/go-testing-interface" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" ) func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot { @@ -29,6 +30,12 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot { refundsUpstream, } }, []UpdateEvent{ + { + CorrelationID: peerTrustBundleIDPrefix + "cloud", + Result: &pbpeering.TrustBundleReadResponse{ + Bundle: TestPeerTrustBundles(t).Bundles[0], + }, + }, { CorrelationID: "upstream-target:payments.default.default.dc1:" + paymentsUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -67,7 +74,7 @@ func TestConfigSnapshotPeering(t testing.T) *ConfigSnapshot { Port: 443, Connect: structs.ServiceConnect{ PeerMeta: &structs.PeeringServiceMeta{ - SpiffeID: []string{"spiffe://d89ac423-e95a-475d-94f2-1c557c57bf31.consul/ns/default/dc/cloud-dc/svc/refunds"}, + SpiffeID: []string{"spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cloud-dc/svc/refunds"}, }, }, }, diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index e4847bec47..b07a2c808a 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -13,7 +13,6 @@ import ( envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3" envoy_matcher_v3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" - "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/any" @@ -79,6 +78,8 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C clusters = append(clusters, passthroughs...) } + // NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go + // so that the sets of endpoints generated matches the sets of clusters. for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] @@ -87,6 +88,10 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } + if _, ok := cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok { + // The trust bundle for this upstream is not available yet, skip for now. + continue + } chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid] if !ok { @@ -210,9 +215,9 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, Service: uid.Name, } - commonTLSContext := makeCommonTLSContextFromLeaf( - cfgSnap, + commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) err := injectSANMatcher(commonTLSContext, spiffeID.URI().String()) @@ -598,9 +603,9 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs } // Enable TLS upstream with the configured client certificate. - commonTLSContext := makeCommonTLSContextFromLeaf( - cfgSnap, + commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) err = injectSANMatcher(commonTLSContext, spiffeIDs...) @@ -794,9 +799,13 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( } } - commonTLSContext := makeCommonTLSContextFromLeaf( - cfgSnap, + rootPEMs := cfgSnap.RootPEMs() + if uid.Peer != "" { + rootPEMs = cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer].ConcatenatedRootPEMs() + } + commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), + rootPEMs, makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) @@ -809,7 +818,6 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( CommonTlsContext: commonTLSContext, Sni: sni, } - transportSocket, err := makeUpstreamTLSTransportSocket(tlsContext) if err != nil { return nil, err diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 711f854b20..dd415062ef 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -48,6 +48,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) + // NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go + // so that the sets of endpoints generated matches the sets of clusters. for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] @@ -56,6 +58,10 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } + if _, ok := cfgSnap.ConnectProxy.PeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok { + // The trust bundle for this upstream is not available yet, skip for now. + continue + } es := s.endpointsFromDiscoveryChain( uid, diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 6b03122e8b..01df376410 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -12,7 +12,7 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/connect/ca" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -761,9 +761,9 @@ func injectHTTPFilterOnFilterChains( func (s *ResourceGenerator) injectConnectTLSOnFilterChains(cfgSnap *proxycfg.ConfigSnapshot, listener *envoy_listener_v3.Listener) error { for idx := range listener.FilterChains { tlsContext := &envoy_tls_v3.DownstreamTlsContext{ - CommonTlsContext: makeCommonTLSContextFromLeaf( - cfgSnap, + CommonTlsContext: makeCommonTLSContext( cfgSnap.Leaf(), + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSIncoming()), ), RequireClientCertificate: &wrappers.BoolValue{Value: true}, @@ -1109,9 +1109,9 @@ func (s *ResourceGenerator) makeFilterChainTerminatingGateway( protocol string, ) (*envoy_listener_v3.FilterChain, error) { tlsContext := &envoy_tls_v3.DownstreamTlsContext{ - CommonTlsContext: makeCommonTLSContextFromLeaf( - cfgSnap, + CommonTlsContext: makeCommonTLSContext( cfgSnap.TerminatingGateway.ServiceLeaves[service], + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSIncoming()), ), RequireClientCertificate: &wrappers.BoolValue{Value: true}, @@ -1637,21 +1637,14 @@ func makeEnvoyHTTPFilter(name string, cfg proto.Message) (*envoy_http_v3.HttpFil }, nil } -func makeCommonTLSContextFromLeaf( - cfgSnap *proxycfg.ConfigSnapshot, +func makeCommonTLSContext( leaf *structs.IssuedCert, + rootPEMs string, tlsParams *envoy_tls_v3.TlsParameters, ) *envoy_tls_v3.CommonTlsContext { - // Concatenate all the root PEMs into one. - if cfgSnap.Roots == nil { + if rootPEMs == "" { return nil } - - rootPEMS := "" - for _, root := range cfgSnap.Roots.Roots { - rootPEMS += ca.EnsureTrailingNewline(root.RootCert) - } - if tlsParams == nil { tlsParams = &envoy_tls_v3.TlsParameters{} } @@ -1662,12 +1655,12 @@ func makeCommonTLSContextFromLeaf( { CertificateChain: &envoy_core_v3.DataSource{ Specifier: &envoy_core_v3.DataSource_InlineString{ - InlineString: ca.EnsureTrailingNewline(leaf.CertPEM), + InlineString: lib.EnsureTrailingNewline(leaf.CertPEM), }, }, PrivateKey: &envoy_core_v3.DataSource{ Specifier: &envoy_core_v3.DataSource_InlineString{ - InlineString: ca.EnsureTrailingNewline(leaf.PrivateKeyPEM), + InlineString: lib.EnsureTrailingNewline(leaf.PrivateKeyPEM), }, }, }, @@ -1677,7 +1670,7 @@ func makeCommonTLSContextFromLeaf( // TODO(banks): later for L7 support we may need to configure ALPN here. TrustedCa: &envoy_core_v3.DataSource{ Specifier: &envoy_core_v3.DataSource_InlineString{ - InlineString: rootPEMS, + InlineString: rootPEMs, }, }, }, diff --git a/agent/xds/listeners_ingress.go b/agent/xds/listeners_ingress.go index 5261bdb50c..ba2019b435 100644 --- a/agent/xds/listeners_ingress.go +++ b/agent/xds/listeners_ingress.go @@ -180,7 +180,7 @@ func makeCommonTLSContextFromSnapshotListenerConfig(cfgSnap *proxycfg.ConfigSnap // Set up listener TLS from SDS tlsContext = makeCommonTLSContextFromGatewayTLSConfig(*tlsCfg) } else if connectTLSEnabled { - tlsContext = makeCommonTLSContextFromLeaf(cfgSnap, cfgSnap.Leaf(), makeTLSParametersFromGatewayTLSConfig(*tlsCfg)) + tlsContext = makeCommonTLSContext(cfgSnap.Leaf(), cfgSnap.RootPEMs(), makeTLSParametersFromGatewayTLSConfig(*tlsCfg)) } return tlsContext, nil diff --git a/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden b/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden index befaa062da..be0a0ef5a0 100644 --- a/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden +++ b/agent/xds/testdata/clusters/connect-proxy-with-peered-upstreams.latest.golden @@ -71,7 +71,7 @@ ], "validationContext": { "trustedCa": { - "inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n" + "inlineString": "-----BEGIN CERTIFICATE-----\nMIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV\nUzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v\nMRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B\nCQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0\nNFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh\nZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl\nci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B\nAQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV\nc2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR\n2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC\nAwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk\nyto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy\n0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH\nZAuKN1aoKA==\n-----END CERTIFICATE-----\n" }, "matchSubjectAltNames": [ { @@ -129,11 +129,11 @@ ], "validationContext": { "trustedCa": { - "inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n" + "inlineString": "-----BEGIN CERTIFICATE-----\nMIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV\nUzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v\nMRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B\nCQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0\nNFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh\nZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl\nci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B\nAQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV\nc2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR\n2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC\nAwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk\nyto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy\n0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH\nZAuKN1aoKA==\n-----END CERTIFICATE-----\n" }, "matchSubjectAltNames": [ { - "exact": "spiffe://d89ac423-e95a-475d-94f2-1c557c57bf31.consul/ns/default/dc/cloud-dc/svc/refunds" + "exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cloud-dc/svc/refunds" } ] } diff --git a/lib/strings.go b/lib/strings.go new file mode 100644 index 0000000000..fea1cf58b1 --- /dev/null +++ b/lib/strings.go @@ -0,0 +1,18 @@ +package lib + +import ( + "strings" +) + +// EnsureTrailingNewline adds a newline suffix to the input if not present. +// This is typically used to fix a case where the CA provider does not return a new line +// after certificates as per the specification. See GH-8178 for more context. +func EnsureTrailingNewline(str string) string { + if str == "" { + return str + } + if strings.HasSuffix(str, "\n") { + return str + } + return str + "\n" +} diff --git a/proto/pbpeering/peering.go b/proto/pbpeering/peering.go index b9da132f83..f19650f3c0 100644 --- a/proto/pbpeering/peering.go +++ b/proto/pbpeering/peering.go @@ -1,9 +1,14 @@ package pbpeering import ( + "strconv" "time" + "github.com/mitchellh/hashstructure" + + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" ) // TODO(peering): These are byproducts of not embedding @@ -88,6 +93,48 @@ func (x ReplicationMessage_Response_Operation) GoString() string { return x.String() } +func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + // TODO(peering): Revisit whether this is the token to use once request types accept a token. + Token: r.Token(), + Datacenter: r.Datacenter, + MinIndex: 0, + Timeout: 0, + MustRevalidate: false, + + // TODO(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works. + // Using an exponential backoff when the result hasn't changed may be preferable. + MaxAge: 1 * time.Second, + } + + v, err := hashstructure.Hash([]interface{}{ + r.Partition, + r.Name, + }, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + +// ConcatenatedRootPEMs concatenates and returns all PEM-encoded public certificates +// in a peer's trust bundle. +func (b *PeeringTrustBundle) ConcatenatedRootPEMs() string { + if b == nil { + return "" + } + + var rootPEMs string + for _, pem := range b.RootPEMs { + rootPEMs += lib.EnsureTrailingNewline(pem) + } + return rootPEMs +} + // enumcover:PeeringState func PeeringStateToAPI(s PeeringState) api.PeeringState { switch s {