From e69bc727ecf11bd5d993364a4522c2b38d40286c Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 10 Oct 2022 13:45:30 -0600 Subject: [PATCH 1/8] Update peering establishment to maybe use gateways When peering through mesh gateways we expect outbound dials to peer servers to flow through the local mesh gateway addresses. Now when establishing a peering we get a list of dial addresses as a ring buffer that includes local mesh gateway addresses if the local DC is configured to peer through mesh gateways. The ring buffer includes the mesh gateway addresses first, but also includes the remote server addresses as a fallback. This fallback is present because it's possible that direct egress from the servers may be allowed. If not allowed then the leader will cycle back to a mesh gateway address through the ring. When attempting to dial the remote servers we retry up to a fixed timeout. If using mesh gateways we also have an initial wait in order to allow for the mesh gateways to configure themselves. Note that if we encounter a permission denied error we do not retry since that error indicates that the secret in the peering token is invalid. --- agent/consul/leader_peering_test.go | 2 +- agent/consul/peering_backend.go | 118 ++++++++++++-- agent/consul/peering_backend_test.go | 220 ++++++++++++++++++++++++++- agent/rpc/peering/service.go | 151 +++++++++++++----- agent/rpc/peering/service_test.go | 183 ++++++++++++++++++++++ 5 files changed, 615 insertions(+), 59 deletions(-) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 5f59787764..373d80f25d 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -567,7 +567,7 @@ func testLeader_PeeringSync_failsForTLSError(t *testing.T, tokenMutateFn func(to } // Since the Establish RPC dials the remote cluster, it will yield the TLS error. - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) _, err = s2Client.Establish(ctx, &establishReq) require.Contains(t, err.Error(), expectErr) diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 26c2f19436..064015cb67 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -1,12 +1,16 @@ package consul import ( + "container/ring" "encoding/base64" "encoding/json" "fmt" "strconv" "sync" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/connect" @@ -85,29 +89,117 @@ func (b *PeeringBackend) GetTLSMaterials(generatingToken bool) (string, []string return serverName, caPems, nil } -// GetServerAddresses looks up server or mesh gateway addresses from the state store. -func (b *PeeringBackend) GetServerAddresses() ([]string, error) { - _, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta()) - if err != nil { - return nil, fmt.Errorf("failed to read mesh config entry: %w", err) - } +// GetLocalServerAddresses looks up server or mesh gateway addresses from the state store for a peer to dial. +func (b *PeeringBackend) GetLocalServerAddresses() ([]string, error) { + store := b.srv.fsm.State() - meshConfig, ok := rawEntry.(*structs.MeshConfigEntry) - if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways { - return meshGatewayAdresses(b.srv.fsm.State()) + useGateways, err := b.PeerThroughMeshGateways(nil) + if err != nil { + // For inbound traffic we prefer to fail fast if we can't determine whether we should peer through MGW. + // This prevents unexpectedly sharing local server addresses when a user only intended to peer through gateways. + return nil, fmt.Errorf("failed to determine if peering should happen through mesh gateways: %w", err) } - return serverAddresses(b.srv.fsm.State()) + if useGateways { + return meshGatewayAdresses(store, nil, true) + } + return serverAddresses(store) } -func meshGatewayAdresses(state *state.Store) ([]string, error) { - _, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword) +// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, +// a boolean indicating whether mesh gateways are present, and an optional error. +// The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present. +func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) { + newRing, err := b.fetchPeerServerAddresses(ws, peerID) + if err != nil { + return nil, false, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err) + } + + gatewayRing, err := b.maybeFetchGatewayAddresses(ws) + if err != nil { + // If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses. + logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly: %w", "error", err) + return newRing, false, nil + } + if gatewayRing != nil { + // The ordering is important here. We always want to start with the mesh gateway + // addresses and fallback to the remote addresses, so we append the server addresses + // in newRing to gatewayRing. + newRing = gatewayRing.Link(newRing) + } + return newRing, gatewayRing != nil, nil +} + +// fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses. +// If the peering is no longer active or does not have addresses, then we return an error. +func (b *PeeringBackend) fetchPeerServerAddresses(ws memdb.WatchSet, peerID string) (*ring.Ring, error) { + _, peering, err := b.Store().PeeringReadByID(ws, peerID) + if err != nil { + return nil, fmt.Errorf("failed to fetch peer %q: %w", peerID, err) + } + if !peering.IsActive() { + return nil, fmt.Errorf("there is no active peering for %q", peerID) + } + + // IMPORTANT: The address ring buffer must always be length > 0 + if len(peering.PeerServerAddresses) == 0 { + return nil, fmt.Errorf("peer %q has no addresses to dial", peerID) + } + return bufferFromAddresses(peering.PeerServerAddresses), nil +} + +// maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the +// local datacenter is configured to peer through mesh gateways and there are local gateways registered. +// If neither of these are true then we return a nil buffer. +func (b *PeeringBackend) maybeFetchGatewayAddresses(ws memdb.WatchSet) (*ring.Ring, error) { + useGateways, err := b.PeerThroughMeshGateways(ws) + if err != nil { + return nil, fmt.Errorf("failed to determine if peering should happen through mesh gateways: %w", err) + } + if useGateways { + addresses, err := meshGatewayAdresses(b.srv.fsm.State(), ws, false) + + // IMPORTANT: The address ring buffer must always be length > 0 + if err != nil || len(addresses) == 0 { + return nil, fmt.Errorf("error fetching local mesh gateway addresses: %w", err) + } + return bufferFromAddresses(addresses), nil + } + return nil, nil +} + +// PeerThroughMeshGateways determines if the config entry to enable peering control plane +// traffic through a mesh gateway is set to enable. +func (b *PeeringBackend) PeerThroughMeshGateways(ws memdb.WatchSet) (bool, error) { + _, rawEntry, err := b.srv.fsm.State().ConfigEntry(ws, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta()) + if err != nil { + return false, fmt.Errorf("failed to read mesh config entry: %w", err) + } + mesh, ok := rawEntry.(*structs.MeshConfigEntry) + if rawEntry != nil && !ok { + return false, fmt.Errorf("got unexpected type for mesh config entry: %T", rawEntry) + } + return mesh.PeerThroughMeshGateways(), nil + +} + +func bufferFromAddresses(addresses []string) *ring.Ring { + ring := ring.New(len(addresses)) + for _, addr := range addresses { + ring.Value = addr + ring = ring.Next() + } + return ring +} + +func meshGatewayAdresses(state *state.Store, ws memdb.WatchSet, wan bool) ([]string, error) { + _, nodes, err := state.ServiceDump(ws, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword) if err != nil { return nil, fmt.Errorf("failed to dump gateway addresses: %w", err) } var addrs []string for _, node := range nodes { - _, addr, port := node.BestAddress(true) + _, addr, port := node.BestAddress(wan) addrs = append(addrs, ipaddr.FormatAddressPort(addr, port)) } if len(addrs) == 0 { diff --git a/agent/consul/peering_backend_test.go b/agent/consul/peering_backend_test.go index 0d834c09a9..63a42e07a9 100644 --- a/agent/consul/peering_backend_test.go +++ b/agent/consul/peering_backend_test.go @@ -10,6 +10,7 @@ import ( gogrpc "google.golang.org/grpc" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" @@ -76,7 +77,7 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) { }) } -func TestPeeringBackend_GetServerAddresses(t *testing.T) { +func TestPeeringBackend_GetLocalServerAddresses(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -91,7 +92,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { backend := NewPeeringBackend(srv) testutil.RunStep(t, "peer to servers", func(t *testing.T) { - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.NoError(t, err) expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort) @@ -107,7 +108,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { } require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh)) - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.NoError(t, err) // Still expect server address because PeerThroughMeshGateways was not enabled. @@ -121,7 +122,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { } require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh)) - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.Nil(t, addrs) testutil.RequireErrorContains(t, err, "servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered") @@ -147,12 +148,221 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { } require.NoError(t, srv.fsm.State().EnsureRegistration(2, ®)) - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.NoError(t, err) require.Equal(t, []string{"154.238.12.252:8443"}, addrs) }) } +func TestPeeringBackend_GetDialAddresses(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, cfg := testServerConfig(t) + cfg.GRPCTLSPort = freeport.GetOne(t) + + srv, err := newServer(t, cfg) + require.NoError(t, err) + testrpc.WaitForLeader(t, srv.RPC, "dc1") + + backend := NewPeeringBackend(srv) + + dialerPeerID := testUUID() + acceptorPeerID := testUUID() + + type expectation struct { + addrs []string + haveGateways bool + err string + } + + type testCase struct { + name string + setup func(store *state.Store) + peerID string + expect expectation + } + + run := func(t *testing.T, tc testCase) { + if tc.setup != nil { + tc.setup(srv.fsm.State()) + } + + ring, haveGateways, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID) + if tc.expect.err != "" { + testutil.RequireErrorContains(t, err, tc.expect.err) + return + } + require.Equal(t, tc.expect.haveGateways, haveGateways) + require.NotNil(t, ring) + + var addrs []string + ring.Do(func(value any) { + addr, ok := value.(string) + + require.True(t, ok) + addrs = append(addrs, addr) + }) + require.Equal(t, tc.expect.addrs, addrs) + } + + // NOTE: The following tests are set up to run serially with RunStep to save on the setup/teardown cost for a test server. + tt := []testCase{ + { + name: "unknown peering", + setup: func(store *state.Store) { + // Test peering is not written during setup + }, + peerID: acceptorPeerID, + expect: expectation{ + err: fmt.Sprintf(`there is no active peering for %q`, acceptorPeerID), + }, + }, + { + name: "no server addresses", + setup: func(store *state.Store) { + require.NoError(t, store.PeeringWrite(1, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "acceptor", + ID: acceptorPeerID, + // Acceptor peers do not have PeerServerAddresses populated locally. + }, + })) + }, + peerID: acceptorPeerID, + expect: expectation{ + err: fmt.Sprintf(`peer %q has no addresses to dial`, acceptorPeerID), + }, + }, + { + name: "only server addrs are returned when mesh config does not exist", + setup: func(store *state.Store) { + require.NoError(t, store.PeeringWrite(2, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "dialer", + ID: dialerPeerID, + PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + })) + + // Mesh config entry does not exist + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: false, + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "only server addrs are returned when not peering through gateways", + setup: func(store *state.Store) { + require.NoError(t, srv.fsm.State().EnsureConfigEntry(3, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: false, // Peering through gateways is not enabled + }, + })) + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: false, + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "only server addrs are returned when peering through gateways without gateways registered", + setup: func(store *state.Store) { + require.NoError(t, srv.fsm.State().EnsureConfigEntry(4, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // No gateways are registered + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: false, + + // Fall back to remote server addresses + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "gateway addresses are included after gateways are registered", + setup: func(store *state.Store) { + require.NoError(t, srv.fsm.State().EnsureRegistration(5, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-1", + Address: "5.6.7.8", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Port: 8443, + TaggedAddresses: map[string]structs.ServiceAddress{ + structs.TaggedAddressWAN: { + Address: "my-lb-addr.not-aws.com", + Port: 443, + }, + }, + }, + })) + require.NoError(t, srv.fsm.State().EnsureRegistration(6, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-2", + Address: "6.7.8.9", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-2", + Service: "mesh-gateway", + Port: 8443, + TaggedAddresses: map[string]structs.ServiceAddress{ + structs.TaggedAddressWAN: { + Address: "my-other-lb-addr.not-aws.com", + Port: 443, + }, + }, + }, + })) + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: true, + + // Gateways come first, and we use their LAN addresses since this is for outbound communication. + addrs: []string{"6.7.8.9:8443", "5.6.7.8:8443", "1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "addresses are not returned if the peering is deleted", + setup: func(store *state.Store) { + require.NoError(t, store.PeeringWrite(5, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "dialer", + ID: dialerPeerID, + PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + + // Mark as deleted + State: pbpeering.PeeringState_DELETING, + DeletedAt: structs.TimeToProto(time.Now()), + }, + })) + }, + peerID: dialerPeerID, + expect: expectation{ + err: fmt.Sprintf(`there is no active peering for %q`, dialerPeerID), + }, + }, + } + + for _, tc := range tt { + testutil.RunStep(t, tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) { return func(ctx context.Context, addr string) (net.Conn, error) { d := net.Dialer{} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index c5abb3d9af..c6eb78dea9 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -1,6 +1,7 @@ package peering import ( + "container/ring" "context" "errors" "fmt" @@ -8,6 +9,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -36,6 +38,15 @@ var ( errPeeringTokenEmptyPeerID = errors.New("peering token peer ID value is empty") ) +const ( + // meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway. + // This wait provides some time for the first gateway address to configure a route to the peer servers. + // Why 350ms? That is roughly the p50 latency we observed in a scale test for proxy config propagation: + // https://www.hashicorp.com/cgsb + meshGatewayWait = 350 * time.Millisecond + establishmentTimeout = 5 * time.Second +) + // errPeeringInvalidServerAddress is returned when an establish request contains // an invalid server address. type errPeeringInvalidServerAddress struct { @@ -118,9 +129,9 @@ type Backend interface { // It returns the server name to validate, and the CA certificate to validate with. GetTLSMaterials(generatingToken bool) (string, []string, error) - // GetServerAddresses returns the addresses used for establishing a peering connection. + // GetLocalServerAddresses returns the addresses used for establishing a peering connection. // These may be server addresses or mesh gateway addresses if peering through mesh gateways. - GetServerAddresses() ([]string, error) + GetLocalServerAddresses() ([]string, error) // EncodeToken packages a peering token into a slice of bytes. EncodeToken(tok *structs.PeeringToken) ([]byte, error) @@ -128,6 +139,12 @@ type Backend interface { // DecodeToken unpackages a peering token from a slice of bytes. DecodeToken([]byte) (*structs.PeeringToken, error) + // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, + // a boolean indicating whether mesh gateways are present, and an optional error. + // The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local + // datacenter is configured to dial through mesh gateways. + GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) + EnterpriseCheckPartitions(partition string) error EnterpriseCheckNamespaces(namespace string) error @@ -298,7 +315,7 @@ func (s *Server) GenerateToken( if len(req.ServerExternalAddresses) > 0 { serverAddrs = req.ServerExternalAddresses } else { - serverAddrs, err = s.Backend.GetServerAddresses() + serverAddrs, err = s.Backend.GetLocalServerAddresses() if err != nil { return nil, err } @@ -419,7 +436,13 @@ func (s *Server) Establish( PeerServerName: tok.ServerName, PeerID: tok.PeerID, Meta: req.Meta, - State: pbpeering.PeeringState_ESTABLISHING, + + // State is intentionally not set until after the secret exchange succeeds. + // This is to prevent a scenario where an active peering is re-established, + // the secret exchange fails, and the peering state gets stuck in "Establishing" + // while the original connection is still active. + // State: pbpeering.PeeringState_ESTABLISHING, + // PartitionOrEmpty is used to avoid writing "default" in OSS. Partition: entMeta.PartitionOrEmpty(), Remote: &pbpeering.RemoteInfo{ @@ -428,39 +451,30 @@ func (s *Server) Establish( }, } - tlsOption, err := peering.TLSDialOption() - if err != nil { - return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) + // Write the peering ahead of the ExchangeSecret handshake to give + // mesh gateways in the default partition an opportunity + // to update their config with an outbound route to this peer server. + // + // If the request to exchange a secret fails then the peering will continue to exist. + // We do not undo this write because this call to establish may actually be a re-establish call + // for an active peering. + writeReq := &pbpeering.PeeringWriteRequest{ + Peering: peering, + } + if err := s.Backend.PeeringWrite(writeReq); err != nil { + return nil, fmt.Errorf("failed to write peering: %w", err) } - exchangeReq := pbpeerstream.ExchangeSecretRequest{ - PeerID: peering.PeerID, - EstablishmentSecret: tok.EstablishmentSecret, - } - var exchangeResp *pbpeerstream.ExchangeSecretResponse - - // Loop through the known server addresses once, attempting to fetch the long-lived stream secret. - var dialErrors error - for _, addr := range serverAddrs { - exchangeResp, err = exchangeSecret(ctx, addr, tlsOption, &exchangeReq) - if err != nil { - dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret with %q: %w", addr, err)) - } - if exchangeResp != nil { - break - } - } + exchangeResp, dialErrors := s.exchangeSecret(ctx, peering, tok.EstablishmentSecret) if exchangeResp == nil { return nil, dialErrors } + peering.State = pbpeering.PeeringState_ESTABLISHING // As soon as a peering is written with a non-empty list of ServerAddresses // and an active stream secret, a leader routine will see the peering and // attempt to establish a peering stream with the remote peer. - // - // This peer now has a record of both the LocalPeerID(ID) and - // RemotePeerID(PeerID) but at this point the other peer does not. - writeReq := &pbpeering.PeeringWriteRequest{ + writeReq = &pbpeering.PeeringWriteRequest{ Peering: peering, SecretsRequest: &pbpeering.SecretsWriteRequest{ PeerID: peering.ID, @@ -474,7 +488,6 @@ func (s *Server) Establish( if err := s.Backend.PeeringWrite(writeReq); err != nil { return nil, fmt.Errorf("failed to write peering: %w", err) } - // TODO(peering): low prio: consider adding response details return resp, nil } @@ -493,20 +506,78 @@ func (s *Server) validatePeeringLocality(token *structs.PeeringToken) error { return nil } -func exchangeSecret(ctx context.Context, addr string, tlsOption grpc.DialOption, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error) { - dialCtx, cancel := context.WithTimeout(ctx, 10*time.Second) +// exchangeSecret will continuously attempt to exchange the given establishment secret with the peer, up to a timeout. +// This function will attempt to dial through mesh gateways if the local DC is configured to peer through gateways, +// but will fall back to server addresses if not. +func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering, establishmentSecret string) (*pbpeerstream.ExchangeSecretResponse, error) { + req := pbpeerstream.ExchangeSecretRequest{ + PeerID: peering.PeerID, + EstablishmentSecret: establishmentSecret, + } + + tlsOption, err := peering.TLSDialOption() + if err != nil { + return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) + } + + ringBuf, usingGateways, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID) + if err != nil { + return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err) + } + + var ( + resp *pbpeerstream.ExchangeSecretResponse + dialErrors error + ) + + retryWait := 150 * time.Millisecond + jitter := retry.NewJitter(25) + + if usingGateways { + // If we are dialing through local gateways we sleep before issuing the first request. + // This gives the local gateways some time to configure a route to the peer servers. + time.Sleep(meshGatewayWait) + } + + retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout) defer cancel() - conn, err := grpc.DialContext(dialCtx, addr, - tlsOption, - ) - if err != nil { - return nil, fmt.Errorf("failed to dial peer: %w", err) - } - defer conn.Close() + for retryCtx.Err() == nil { + addr := ringBuf.Value.(string) - client := pbpeerstream.NewPeerStreamServiceClient(conn) - return client.ExchangeSecret(ctx, req) + dialCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + conn, err := grpc.DialContext(dialCtx, addr, + tlsOption, + ) + if err != nil { + return nil, fmt.Errorf("failed to dial peer: %w", err) + } + defer conn.Close() + + client := pbpeerstream.NewPeerStreamServiceClient(conn) + resp, err = client.ExchangeSecret(ctx, &req) + + // If we got a permission denied error that means out establishment secret is invalid, so we do not retry. + grpcErr, ok := grpcstatus.FromError(err) + if ok && grpcErr.Code() == codes.PermissionDenied { + return nil, fmt.Errorf("a new peering token must be generated: %w", grpcErr.Err()) + } + if err != nil { + dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret through address %q: %w", addr, err)) + } + if resp != nil { + // Got a valid response. We're done. + break + } + + time.Sleep(jitter(retryWait)) + + // Cycle to the next possible address. + ringBuf = ringBuf.Next() + } + return resp, dialErrors } // OPTIMIZE: Handle blocking queries diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 300e463bee..f2db059e75 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/google/tcpproxy" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -476,6 +477,188 @@ func TestPeeringService_Establish(t *testing.T) { }) } +func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { + // This test is timing-sensitive, must not be run in parallel. + // t.Parallel() + + acceptor := newTestServer(t, func(conf *consul.Config) { + conf.NodeName = "acceptor" + }) + acceptorClient := pbpeering.NewPeeringServiceClient(acceptor.ClientConn(t)) + + dialer := newTestServer(t, func(conf *consul.Config) { + conf.NodeName = "dialer" + conf.Datacenter = "dc2" + conf.PrimaryDatacenter = "dc2" + }) + dialerClient := pbpeering.NewPeeringServiceClient(dialer.ClientConn(t)) + + var peeringToken string + + testutil.RunStep(t, "retry until timeout on dial errors", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + testToken := structs.PeeringToken{ + ServerAddresses: []string{fmt.Sprintf("127.0.0.1:%d", freeport.GetOne(t))}, + PeerID: testUUID(t), + } + testTokenJSON, _ := json.Marshal(&testToken) + testTokenB64 := base64.StdEncoding.EncodeToString(testTokenJSON) + + start := time.Now() + _, err := dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: testTokenB64, + }) + require.Error(t, err) + testutil.RequireErrorContains(t, err, "connection refused") + + require.Greater(t, time.Since(start), 5*time.Second) + }) + + testutil.RunStep(t, "peering can be established from token", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + // Generate a peering token for dialer + tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"}) + require.NoError(t, err) + + // Capture peering token for re-use later + peeringToken = tokenResp.PeeringToken + + // The context timeout is short, it checks that we do not wait the 350ms that we do when peering through mesh gateways + ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond) + t.Cleanup(cancel) + + _, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: tokenResp.PeeringToken, + }) + require.NoError(t, err) + }) + + testutil.RunStep(t, "fail fast on permission denied", func(t *testing.T) { + // This test case re-uses the previous token since the establishment secret will have been invalidated. + // The context timeout is short, it checks that we do not retry. + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + t.Cleanup(cancel) + + _, err := dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: peeringToken, + }) + testutil.RequireErrorContains(t, err, "a new peering token must be generated") + }) + + gatewayPort := freeport.GetOne(t) + + testutil.RunStep(t, "fail past bad mesh gateway", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + // Generate a new peering token for the dialer. + tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"}) + require.NoError(t, err) + + store := dialer.Server.FSM().State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // Register a gateway that isn't actually listening. + require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{ + ID: types.NodeID(testUUID(t)), + Node: "gateway-node-1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Port: gatewayPort, + }, + })) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + // Call to establish should succeed when we fall back to remote server address. + _, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: tokenResp.PeeringToken, + }) + require.NoError(t, err) + }) + + testutil.RunStep(t, "route through gateway", func(t *testing.T) { + // Spin up a proxy listening at the gateway port registered above. + gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort) + + // Configure a TCP proxy with an SNI route corresponding to the acceptor cluster. + var proxy tcpproxy.Proxy + target := &connWrapper{ + proxy: tcpproxy.DialProxy{ + Addr: acceptor.PublicGRPCAddr, + }, + } + proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target) + proxy.AddStopACMESearch(gatewayAddr) + + require.NoError(t, proxy.Start()) + t.Cleanup(func() { + proxy.Close() + proxy.Wait() + }) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + // Generate a new peering token for the dialer. + tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"}) + require.NoError(t, err) + + store := dialer.Server.FSM().State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + start := time.Now() + + // Call to establish should succeed through the proxy. + _, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: tokenResp.PeeringToken, + }) + require.NoError(t, err) + + // Dialing through a gateway is preceded by a mandatory 350ms sleep. + require.Greater(t, time.Since(start), 350*time.Millisecond) + + // target.called is true when the tcproxy's conn handler was invoked. + // This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway. + require.True(t, target.called) + }) +} + +// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection. +type connWrapper struct { + proxy tcpproxy.DialProxy + called bool +} + +func (w *connWrapper) HandleConn(src net.Conn) { + w.called = true + w.proxy.HandleConn(src) +} + func TestPeeringService_Establish_ACLEnforcement(t *testing.T) { validToken := peering.TestPeeringToken("83474a06-cca4-4ff4-99a4-4152929c8160") validTokenJSON, _ := json.Marshal(&validToken) From 2c99a215960f733c150298cd81dcecbb693e5250 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 10 Oct 2022 13:54:36 -0600 Subject: [PATCH 2/8] Update leader routine to maybe use gateways --- agent/consul/leader_peering.go | 132 ++++++++------- agent/consul/leader_peering_test.go | 238 +++++++++++++++++++++++++++ agent/consul/peering_backend.go | 21 +-- agent/consul/peering_backend_test.go | 2 +- 4 files changed, 318 insertions(+), 75 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 5bdcfe500c..c9f3394983 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -243,11 +243,22 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, continue } - status, found := s.peerStreamServer.StreamStatus(peer.ID) + // We may have written this peering to the store to trigger xDS updates, but still in the process of establishing. + // If there isn't a secret yet, we're still trying to reach the other server. + logger.Trace("reading peering secret", "sequence_id", seq) + secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID) + if err != nil { + return fmt.Errorf("failed to read secret for peering: %w", err) + } + if secret.GetStream().GetActiveSecretID() == "" { + continue + } - // TODO(peering): If there is new peering data and a connected stream, should we tear down the stream? + status, found := s.peerStreamServer.StreamStatus(peer.ID) if found && status.Connected { // Nothing to do when we already have an active stream to the peer. + // Updated data will only be used if the stream becomes disconnected + // since there's no need to tear down an active stream. continue } logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq) @@ -259,7 +270,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancel() } - if err := s.establishStream(ctx, logger, ws, peer, cancelFns); err != nil { + if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil { // TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs. // Lockable status isn't available here though. Could report it via the peering.Service? logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err) @@ -273,7 +284,6 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq) // Clean up active streams of peerings that were deleted from the state store. - // TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK? for stream, doneCh := range connectedStreams { if _, ok := stored[stream]; ok { // Active stream is in the state store, nothing to do. @@ -298,7 +308,11 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, return merr.ErrorOrNil() } -func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws memdb.WatchSet, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error { +func (s *Server) establishStream(ctx context.Context, + logger hclog.Logger, + peer *pbpeering.Peering, + secret *pbpeering.PeeringSecrets, + cancelFns map[string]context.CancelFunc) error { logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID) if peer.PeerID == "" { @@ -310,10 +324,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me return fmt.Errorf("failed to build TLS dial option from peering: %w", err) } - secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID) - if err != nil { - return fmt.Errorf("failed to read secret for peering: %w", err) - } if secret.GetStream().GetActiveSecretID() == "" { return errors.New("missing stream secret for peering stream authorization, peering must be re-established") } @@ -331,15 +341,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me // Start a goroutine to watch for updates to peer server addresses. // The latest valid server address can be received from nextServerAddr. nextServerAddr := make(chan string) - go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr) + go s.watchAddresses(streamCtx, peer.ID, nextServerAddr) // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. go retryLoopBackoffPeering(streamCtx, logger, func() error { // Try a new address on each iteration by advancing the ring buffer on errors. - addr := <-nextServerAddr + addr, stillOpen := <-nextServerAddr + if !stillOpen { + // If the channel was closed that means the context was canceled, so we return. + return streamCtx.Err() + } opts := []grpc.DialOption{ tlsOption, + // TODO(peering): Use a grpc.WithStatsHandler here. + // This should wait until the grpc-external server is wired up with a stats handler in NET-50. // For keep alive parameters there is a larger comment in ClientConnPool.dial about that. grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, @@ -349,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me }), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)), } - // TODO(peering): use a grpc.WithStatsHandler here?) + logger.Trace("dialing peer", "addr", addr) conn, err := grpc.DialContext(streamCtx, addr, opts...) @@ -400,83 +416,75 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me }, func(err error) { // TODO(peering): why are we using TrackSendError here? This could also be a receive error. streamStatus.TrackSendError(err.Error()) - if isErrCode(err, codes.FailedPrecondition) { + + switch { + case isErrCode(err, codes.FailedPrecondition): logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", "error", err) - return - } else if isErrCode(err, codes.ResourceExhausted) { + + case isErrCode(err, codes.ResourceExhausted): logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting", "error", err) - return + + case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded): + logger.Debug("stream context was canceled", "error", err) + + case err != nil: + logger.Error("error managing peering stream", "error", err) } - logger.Error("error managing peering stream", "error", err) }, peeringRetryTimeout) return nil } -// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr. -// It loads the server addresses into a ring buffer and cycles through them until: -// 1. streamCtx is cancelled (peer is deleted) -// 2. the peer is modified and the watchset fires. +// watchAddresses sends an up-to-date address to nextServerAddr. +// These could be either remote peer server addresses, or local mesh gateways. +// The function loads the addresses into a ring buffer and cycles through them until: +// 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data) +// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires. // -// In case (2) we refetch the peering and rebuild the ring buffer. -func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) { +// In case (2) we re-fetch all the data sources and rebuild the ring buffer. +// In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with +// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses. +func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) { defer close(nextServerAddr) - // we initialize the ring buffer with the peer passed to `establishStream` - // because the caller has pre-checked `peer.ShouldDial`, guaranteeing - // at least one server address. - // - // IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block. - ringbuf := ring.New(len(peer.PeerServerAddresses)) - for _, addr := range peer.PeerServerAddresses { - ringbuf.Value = addr - ringbuf = ringbuf.Next() - } - innerWs := memdb.NewWatchSet() - _, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID) - if err != nil { - s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.", - "peer_id", peer.ID, - "error", err) - } + var ringbuf *ring.Ring + var ws memdb.WatchSet - fetchAddrs := func() error { - // reinstantiate innerWs to prevent it from growing indefinitely - innerWs = memdb.NewWatchSet() - _, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID) + fetchAddresses := func() error { + // Re-instantiate ws since it can only be watched once. + ws = memdb.NewWatchSet() + + newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID) if err != nil { - return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err) - } - if !peering.IsActive() { - return fmt.Errorf("peer %q is no longer active", peer.ID) - } - if len(peering.PeerServerAddresses) == 0 { - return fmt.Errorf("peer %q has no addresses to dial", peer.ID) + return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err) } + ringbuf = newRing - ringbuf = ring.New(len(peering.PeerServerAddresses)) - for _, addr := range peering.PeerServerAddresses { - ringbuf.Value = addr - ringbuf = ringbuf.Next() - } return nil } + // Initialize the first ring buffer. + if err := fetchAddresses(); err != nil { + s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err) + } + for { select { case nextServerAddr <- ringbuf.Value.(string): ringbuf = ringbuf.Next() - case err := <-innerWs.WatchCh(ctx): + + case err := <-ws.WatchCh(ctx): if err != nil { - // context was cancelled + // Context was cancelled. return } - // watch fired so we refetch the peering and rebuild the ring buffer - if err := fetchAddrs(); err != nil { - s.logger.Warn("watchset for peer was fired but failed to update server addresses", - "peer_id", peer.ID, + + // Watch fired so we re-fetch the necessary addresses and replace the ring buffer. + if err := fetchAddresses(); err != nil { + s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated", + "peer_id", peerID, "error", err) } } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 373d80f25d..fca4bfeaae 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -8,10 +8,12 @@ import ( "fmt" "io/ioutil" "math" + "net" "testing" "time" "github.com/armon/go-metrics" + "github.com/google/tcpproxy" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" @@ -1861,3 +1863,239 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) { }) }) } + +func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + ca := connect.TestCA(t, nil) + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.GRPCTLSPort = freeport.GetOne(t) + c.CAConfig = &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: structs.ConsulCAProvider, + Config: map[string]interface{}{ + "PrivateKey": ca.SigningKey, + "RootCert": ca.RootCert, + }, + } + }) + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + acceptorClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-dialer", + } + resp, err := acceptorClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, dialer.RPC, "dc2") + + // Configure peering to go through mesh gateways + store := dialer.fsm.State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // Register a gateway that isn't actually listening. + require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Port: freeport.GetOne(t), + }, + })) + + // Create a peering at dialer by establishing a peering with acceptor's token + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + dialerClient := pbpeering.NewPeeringServiceClient(conn) + + establishReq := pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: resp.PeeringToken, + } + _, err = dialerClient.Establish(ctx, &establishReq) + require.NoError(t, err) + + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) + require.NoError(t, err) + + // The peering should eventually connect because we fall back to the token's server addresses. + retry.Run(t, func(r *retry.R) { + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) +} + +func Test_Leader_PeeringSync_PeerThroughMeshGateways_Success(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + ca := connect.TestCA(t, nil) + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.GRPCTLSPort = freeport.GetOne(t) + c.CAConfig = &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: structs.ConsulCAProvider, + Config: map[string]interface{}{ + "PrivateKey": ca.SigningKey, + "RootCert": ca.RootCert, + }, + } + }) + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + acceptorClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-dialer", + } + resp, err := acceptorClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, dialer.RPC, "dc2") + + // Configure peering to go through mesh gateways + store := dialer.fsm.State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // Register a mesh gateway and a tcpproxy listening at its address. + gatewayPort := freeport.GetOne(t) + gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort) + + require.NoError(t, store.EnsureRegistration(3, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-2", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-2", + Service: "mesh-gateway", + Port: gatewayPort, + }, + })) + + // Configure a TCP proxy with an SNI route corresponding to the acceptor cluster. + var proxy tcpproxy.Proxy + target := &connWrapper{ + proxy: tcpproxy.DialProxy{ + Addr: fmt.Sprintf("127.0.0.1:%d", acceptor.config.GRPCTLSPort), + }, + } + proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target) + proxy.AddStopACMESearch(gatewayAddr) + + require.NoError(t, proxy.Start()) + t.Cleanup(func() { + proxy.Close() + proxy.Wait() + }) + + // Create a peering at dialer by establishing a peering with acceptor's token + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + dialerClient := pbpeering.NewPeeringServiceClient(conn) + + establishReq := pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: resp.PeeringToken, + } + _, err = dialerClient.Establish(ctx, &establishReq) + require.NoError(t, err) + + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) + require.NoError(t, err) + + // The peering should eventually connect through the gateway address. + retry.Run(t, func(r *retry.R) { + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + // target.called is true when the tcproxy's conn handler was invoked. + // This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway. + require.True(t, target.called) +} + +// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection. +type connWrapper struct { + proxy tcpproxy.DialProxy + called bool +} + +func (w *connWrapper) HandleConn(src net.Conn) { + w.called = true + w.proxy.HandleConn(src) +} diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 064015cb67..d9daeaea15 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -139,12 +139,7 @@ func (b *PeeringBackend) fetchPeerServerAddresses(ws memdb.WatchSet, peerID stri if !peering.IsActive() { return nil, fmt.Errorf("there is no active peering for %q", peerID) } - - // IMPORTANT: The address ring buffer must always be length > 0 - if len(peering.PeerServerAddresses) == 0 { - return nil, fmt.Errorf("peer %q has no addresses to dial", peerID) - } - return bufferFromAddresses(peering.PeerServerAddresses), nil + return bufferFromAddresses(peering.PeerServerAddresses) } // maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the @@ -157,12 +152,10 @@ func (b *PeeringBackend) maybeFetchGatewayAddresses(ws memdb.WatchSet) (*ring.Ri } if useGateways { addresses, err := meshGatewayAdresses(b.srv.fsm.State(), ws, false) - - // IMPORTANT: The address ring buffer must always be length > 0 - if err != nil || len(addresses) == 0 { + if err != nil { return nil, fmt.Errorf("error fetching local mesh gateway addresses: %w", err) } - return bufferFromAddresses(addresses), nil + return bufferFromAddresses(addresses) } return nil, nil } @@ -182,13 +175,17 @@ func (b *PeeringBackend) PeerThroughMeshGateways(ws memdb.WatchSet) (bool, error } -func bufferFromAddresses(addresses []string) *ring.Ring { +func bufferFromAddresses(addresses []string) (*ring.Ring, error) { + // IMPORTANT: The address ring buffer must always be length > 0 + if len(addresses) == 0 { + return nil, fmt.Errorf("no known addresses") + } ring := ring.New(len(addresses)) for _, addr := range addresses { ring.Value = addr ring = ring.Next() } - return ring + return ring, nil } func meshGatewayAdresses(state *state.Store, ws memdb.WatchSet, wan bool) ([]string, error) { diff --git a/agent/consul/peering_backend_test.go b/agent/consul/peering_backend_test.go index 63a42e07a9..a962bdefc0 100644 --- a/agent/consul/peering_backend_test.go +++ b/agent/consul/peering_backend_test.go @@ -232,7 +232,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { }, peerID: acceptorPeerID, expect: expectation{ - err: fmt.Sprintf(`peer %q has no addresses to dial`, acceptorPeerID), + err: "no known addresses", }, }, { From 472a8e82dc468111cb4ddfb8bd3fcef80a628c56 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 10 Oct 2022 13:55:28 -0600 Subject: [PATCH 3/8] Add integ test for peering through gateways --- .../alpha/base.hcl | 5 ++ .../alpha/config_entries.hcl | 32 +++++++++ .../alpha/service_gateway.hcl | 5 ++ .../alpha/service_s1.hcl | 1 + .../alpha/service_s2.hcl | 7 ++ .../alpha/setup.sh | 12 ++++ .../alpha/verify.bats | 49 +++++++++++++ .../bind.hcl | 2 + .../capture.sh | 4 ++ .../primary/base.hcl | 3 + .../primary/config_entries.hcl | 18 +++++ .../primary/service_gateway.hcl | 5 ++ .../primary/service_s1.hcl | 20 ++++++ .../primary/service_s2.hcl | 1 + .../primary/setup.sh | 11 +++ .../primary/verify.bats | 69 +++++++++++++++++++ .../case-cross-peer-control-plane-mgw/vars.sh | 4 ++ 17 files changed, 248 insertions(+) create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats create mode 100644 test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl new file mode 100644 index 0000000000..f81ab0edd6 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl @@ -0,0 +1,5 @@ +primary_datacenter = "alpha" +log_level = "trace" +peering { + enabled = true +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl new file mode 100644 index 0000000000..996df1d213 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl @@ -0,0 +1,32 @@ +config_entries { + bootstrap = [ + { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "tcp" + } + }, + { + kind = "mesh" + peering { + peer_through_mesh_gateways = true + } + }, + { + kind = "exported-services" + name = "default" + services = [ + { + name = "s2" + consumers = [ + { + peer = "alpha-to-primary" + } + ] + } + ] + } + ] +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl new file mode 100644 index 0000000000..bcdcb2e8b3 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl @@ -0,0 +1,5 @@ +services { + name = "mesh-gateway" + kind = "mesh-gateway" + port = 4432 +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl new file mode 100644 index 0000000000..e97ec23666 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl @@ -0,0 +1 @@ +# We don't want an s1 service in this peer diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl new file mode 100644 index 0000000000..01d4505c67 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl @@ -0,0 +1,7 @@ +services { + name = "s2" + port = 8181 + connect { + sidecar_service {} + } +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh new file mode 100644 index 0000000000..6d341b20a2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -euo pipefail + +register_services alpha + +gen_envoy_bootstrap s2 19002 alpha +gen_envoy_bootstrap mesh-gateway 19003 alpha true + +wait_for_config_entry proxy-defaults global alpha +wait_for_config_entry exported-services default alpha +wait_for_config_entry mesh mesh alpha diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats new file mode 100644 index 0000000000..06314df903 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats @@ -0,0 +1,49 @@ +#!/usr/bin/env bats + +load helpers + +@test "s2 proxy is running correct version" { + assert_envoy_version 19002 +} + +@test "s2 proxy admin is up on :19002" { + retry_default curl -f -s localhost:19002/stats -o /dev/null +} + +@test "gateway-alpha proxy admin is up on :19003" { + retry_default curl -f -s localhost:19003/stats -o /dev/null +} + +@test "s2 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s2 alpha +} + +@test "s2 proxy should be healthy" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "gateway-alpha should be up and listening" { + retry_long nc -z consul-alpha-client:4432 +} + +@test "s2 proxies should be healthy" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "dialer gateway-alpha should have healthy endpoints for alpha servers" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19003 server.alpha.peering HEALTHY 1 +} + +@test "dialer gateway-alpha should have healthy endpoints for primary servers" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19003 server.primary.peering HEALTHY 1 +} + +# Re-peering the clusters is a way to have alpha dial out through its own gateway +# since we know it is configured with endpoints for primary from the first time they peered. +@test "re-peer the two clusters together" { + create_peering primary alpha +} + +@test "alpha servers made connection to primary servers via alpha gateway" { + assert_envoy_metric_at_least 127.0.0.1:19003 "cluster.server.primary.peering.*cx_total" 1 +} \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl new file mode 100644 index 0000000000..f54393f03e --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl @@ -0,0 +1,2 @@ +bind_addr = "0.0.0.0" +advertise_addr = "{{ GetInterfaceIP \"eth0\" }}" \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh new file mode 100644 index 0000000000..f7205ae515 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +snapshot_envoy_admin localhost:19001 mesh-gateway primary || true +snapshot_envoy_admin localhost:19003 mesh-gateway alpha || true diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl new file mode 100644 index 0000000000..c1e134d5a2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl @@ -0,0 +1,3 @@ +peering { + enabled = true +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl new file mode 100644 index 0000000000..6baeb569f2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl @@ -0,0 +1,18 @@ +config_entries { + bootstrap = [ + { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "tcp" + } + }, + { + kind = "mesh" + peering { + peer_through_mesh_gateways = true + } + } + ] +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl new file mode 100644 index 0000000000..831a70ff32 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl @@ -0,0 +1,5 @@ +services { + name = "mesh-gateway" + kind = "mesh-gateway" + port = 4431 +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl new file mode 100644 index 0000000000..af0773c29a --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl @@ -0,0 +1,20 @@ +services { + name = "s1" + port = 8080 + connect { + sidecar_service { + proxy { + upstreams = [ + { + destination_name = "s2" + destination_peer = "primary-to-alpha" + local_bind_port = 5000 + mesh_gateway { + mode = "local" + } + } + ] + } + } + } +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl new file mode 100644 index 0000000000..77164e722b --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl @@ -0,0 +1 @@ +# We don't want an s2 service in the primary dc \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh new file mode 100644 index 0000000000..3aa37f8cb0 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -euo pipefail + +register_services primary + +gen_envoy_bootstrap s1 19000 primary +gen_envoy_bootstrap mesh-gateway 19001 primary true + +wait_for_config_entry proxy-defaults global +wait_for_config_entry mesh mesh alpha diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats new file mode 100644 index 0000000000..bc21d5ee3f --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats @@ -0,0 +1,69 @@ +#!/usr/bin/env bats + +load helpers + +@test "s1 proxy is running correct version" { + assert_envoy_version 19000 +} + +@test "s1 proxy admin is up on :19000" { + retry_default curl -f -s localhost:19000/stats -o /dev/null +} + +@test "gateway-primary proxy admin is up on :19001" { + retry_default curl -f -s localhost:19001/stats -o /dev/null +} + +@test "s1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s1 +} + +@test "s2 proxies should be healthy in alpha" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "gateway-primary should be up and listening" { + retry_long nc -z consul-primary-client:4431 +} + +@test "gateway-alpha should be up and listening" { + retry_long nc -z consul-alpha-client:4432 +} + +@test "peer the two clusters together" { + create_peering primary alpha +} + +@test "acceptor gateway-primary should have healthy endpoints for primary servers" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19001 server.primary.peering HEALTHY 1 +} + +@test "alpha servers made connection to primary servers via primary gateway" { + assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.server.primary.peering.*cx_total" 1 +} + +@test "s2 alpha proxies should be healthy in primary" { + assert_service_has_healthy_instances s2 1 primary "" "" primary-to-alpha +} + +@test "gateway-alpha should have healthy endpoints for s2" { + assert_upstream_has_endpoints_in_status consul-alpha-client:19003 exported~s2.default.alpha HEALTHY 1 +} + +@test "s1 upstream should have healthy endpoints for s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2.default.primary-to-alpha.external HEALTHY 1 +} + +@test "s1 upstream should be able to connect to s2" { + run retry_default curl -s -f -d hello localhost:5000 + [ "$status" -eq 0 ] + [ "$output" = "hello" ] +} + +@test "s1 upstream made 1 connection to s2" { + assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.s2.default.primary-to-alpha.external.*cx_total" 1 +} + +@test "s1 upstream made 1 connection to s2 through the primary mesh gateway" { + assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.s2.default.default.alpha-to-primary.external.*cx_total" 1 +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh new file mode 100644 index 0000000000..c856de1733 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +export REQUIRED_SERVICES="s1 s1-sidecar-proxy gateway-primary s2-alpha s2-sidecar-proxy-alpha gateway-alpha" +export REQUIRE_PEERS=1 From 96fdd3728a43c2391b4a9fc8c81e0311f4ef65e7 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 10 Oct 2022 13:56:38 -0600 Subject: [PATCH 4/8] Fix CA init error code --- agent/grpc-external/services/peerstream/stream_resources.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 281156a222..bf9d1b791a 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -709,7 +709,7 @@ func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) { return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config") case cfg == nil: logger.Warn("cannot begin stream because Connect CA is not yet initialized") - return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized") + return "", grpcstatus.Error(codes.Unavailable, "Connect CA is not yet initialized") } return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil } From 573aa408a15d181c90d1b311045fb2670c4984b4 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 13 Oct 2022 15:55:55 -0600 Subject: [PATCH 5/8] Lint --- agent/consul/leader_peering.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index c9f3394983..bfa24c6ce1 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -365,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, }), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)), } - + logger.Trace("dialing peer", "addr", addr) conn, err := grpc.DialContext(streamCtx, addr, opts...) From f48d7fbe043407c7f4d84b1ed56b719954949cd3 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 13 Oct 2022 16:03:15 -0600 Subject: [PATCH 6/8] Add changelog entry --- .changelog/14981.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/14981.txt diff --git a/.changelog/14981.txt b/.changelog/14981.txt new file mode 100644 index 0000000000..ef1b4d6014 --- /dev/null +++ b/.changelog/14981.txt @@ -0,0 +1,3 @@ +```release-note:feature +peering: add support for routine peering control-plane traffic through mesh gateways +``` \ No newline at end of file From bf51021c070b6cd4125900d8888ec25e38203012 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 13 Oct 2022 16:54:14 -0600 Subject: [PATCH 7/8] Use split wildcard partition name This way OSS avoids passing a non-empty label, which will be rejected in OSS consul. --- acl/acl_oss.go | 3 ++- agent/proxycfg/mesh_gateway.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/acl/acl_oss.go b/acl/acl_oss.go index 6932808831..48f671ac7a 100644 --- a/acl/acl_oss.go +++ b/acl/acl_oss.go @@ -4,7 +4,8 @@ package acl const ( - DefaultPartitionName = "" + WildcardPartitionName = "" + DefaultPartitionName = "" ) // Reviewer Note: This is a little bit strange; one might want it to be "" like partition name diff --git a/agent/proxycfg/mesh_gateway.go b/agent/proxycfg/mesh_gateway.go index 1378f9b9bf..63747a9404 100644 --- a/agent/proxycfg/mesh_gateway.go +++ b/agent/proxycfg/mesh_gateway.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/go-hclog" cachetype "github.com/hashicorp/consul/agent/cache-types" @@ -564,7 +565,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn peeringListCtx, cancel := context.WithCancel(ctx) err := s.dataSources.PeeringList.Notify(peeringListCtx, &cachetype.PeeringListRequest{ Request: &pbpeering.PeeringListRequest{ - Partition: structs.WildcardSpecifier, + Partition: acl.WildcardPartitionName, }, }, peerServersWatchID, s.ch) if err != nil { From c77123a2aa13c9e0214c1ec42af775213c3c42ad Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 13 Oct 2022 17:12:05 -0600 Subject: [PATCH 8/8] Use split var in tests --- agent/proxycfg/state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index e80ea4e63f..dec7f613f8 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -246,7 +246,7 @@ func genVerifyPeeringListWatchForMeshGateway() verifyWatchRequest { return func(t testing.TB, request any) { reqReal, ok := request.(*cachetype.PeeringListRequest) require.True(t, ok) - require.Equal(t, structs.WildcardSpecifier, reqReal.Request.Partition) + require.Equal(t, acl.WildcardPartitionName, reqReal.Request.Partition) } }