diff --git a/.changelog/14981.txt b/.changelog/14981.txt new file mode 100644 index 0000000000..ef1b4d6014 --- /dev/null +++ b/.changelog/14981.txt @@ -0,0 +1,3 @@ +```release-note:feature +peering: add support for routine peering control-plane traffic through mesh gateways +``` \ No newline at end of file diff --git a/acl/acl_oss.go b/acl/acl_oss.go index 6932808831..48f671ac7a 100644 --- a/acl/acl_oss.go +++ b/acl/acl_oss.go @@ -4,7 +4,8 @@ package acl const ( - DefaultPartitionName = "" + WildcardPartitionName = "" + DefaultPartitionName = "" ) // Reviewer Note: This is a little bit strange; one might want it to be "" like partition name diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 5bdcfe500c..bfa24c6ce1 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -243,11 +243,22 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, continue } - status, found := s.peerStreamServer.StreamStatus(peer.ID) + // We may have written this peering to the store to trigger xDS updates, but still in the process of establishing. + // If there isn't a secret yet, we're still trying to reach the other server. + logger.Trace("reading peering secret", "sequence_id", seq) + secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID) + if err != nil { + return fmt.Errorf("failed to read secret for peering: %w", err) + } + if secret.GetStream().GetActiveSecretID() == "" { + continue + } - // TODO(peering): If there is new peering data and a connected stream, should we tear down the stream? + status, found := s.peerStreamServer.StreamStatus(peer.ID) if found && status.Connected { // Nothing to do when we already have an active stream to the peer. + // Updated data will only be used if the stream becomes disconnected + // since there's no need to tear down an active stream. continue } logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq) @@ -259,7 +270,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancel() } - if err := s.establishStream(ctx, logger, ws, peer, cancelFns); err != nil { + if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil { // TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs. // Lockable status isn't available here though. Could report it via the peering.Service? logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err) @@ -273,7 +284,6 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq) // Clean up active streams of peerings that were deleted from the state store. - // TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK? for stream, doneCh := range connectedStreams { if _, ok := stored[stream]; ok { // Active stream is in the state store, nothing to do. @@ -298,7 +308,11 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, return merr.ErrorOrNil() } -func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws memdb.WatchSet, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error { +func (s *Server) establishStream(ctx context.Context, + logger hclog.Logger, + peer *pbpeering.Peering, + secret *pbpeering.PeeringSecrets, + cancelFns map[string]context.CancelFunc) error { logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID) if peer.PeerID == "" { @@ -310,10 +324,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me return fmt.Errorf("failed to build TLS dial option from peering: %w", err) } - secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID) - if err != nil { - return fmt.Errorf("failed to read secret for peering: %w", err) - } if secret.GetStream().GetActiveSecretID() == "" { return errors.New("missing stream secret for peering stream authorization, peering must be re-established") } @@ -331,15 +341,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me // Start a goroutine to watch for updates to peer server addresses. // The latest valid server address can be received from nextServerAddr. nextServerAddr := make(chan string) - go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr) + go s.watchAddresses(streamCtx, peer.ID, nextServerAddr) // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. go retryLoopBackoffPeering(streamCtx, logger, func() error { // Try a new address on each iteration by advancing the ring buffer on errors. - addr := <-nextServerAddr + addr, stillOpen := <-nextServerAddr + if !stillOpen { + // If the channel was closed that means the context was canceled, so we return. + return streamCtx.Err() + } opts := []grpc.DialOption{ tlsOption, + // TODO(peering): Use a grpc.WithStatsHandler here. + // This should wait until the grpc-external server is wired up with a stats handler in NET-50. // For keep alive parameters there is a larger comment in ClientConnPool.dial about that. grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, @@ -349,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me }), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)), } - // TODO(peering): use a grpc.WithStatsHandler here?) + logger.Trace("dialing peer", "addr", addr) conn, err := grpc.DialContext(streamCtx, addr, opts...) @@ -400,83 +416,75 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me }, func(err error) { // TODO(peering): why are we using TrackSendError here? This could also be a receive error. streamStatus.TrackSendError(err.Error()) - if isErrCode(err, codes.FailedPrecondition) { + + switch { + case isErrCode(err, codes.FailedPrecondition): logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", "error", err) - return - } else if isErrCode(err, codes.ResourceExhausted) { + + case isErrCode(err, codes.ResourceExhausted): logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting", "error", err) - return + + case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded): + logger.Debug("stream context was canceled", "error", err) + + case err != nil: + logger.Error("error managing peering stream", "error", err) } - logger.Error("error managing peering stream", "error", err) }, peeringRetryTimeout) return nil } -// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr. -// It loads the server addresses into a ring buffer and cycles through them until: -// 1. streamCtx is cancelled (peer is deleted) -// 2. the peer is modified and the watchset fires. +// watchAddresses sends an up-to-date address to nextServerAddr. +// These could be either remote peer server addresses, or local mesh gateways. +// The function loads the addresses into a ring buffer and cycles through them until: +// 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data) +// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires. // -// In case (2) we refetch the peering and rebuild the ring buffer. -func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) { +// In case (2) we re-fetch all the data sources and rebuild the ring buffer. +// In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with +// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses. +func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) { defer close(nextServerAddr) - // we initialize the ring buffer with the peer passed to `establishStream` - // because the caller has pre-checked `peer.ShouldDial`, guaranteeing - // at least one server address. - // - // IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block. - ringbuf := ring.New(len(peer.PeerServerAddresses)) - for _, addr := range peer.PeerServerAddresses { - ringbuf.Value = addr - ringbuf = ringbuf.Next() - } - innerWs := memdb.NewWatchSet() - _, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID) - if err != nil { - s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.", - "peer_id", peer.ID, - "error", err) - } + var ringbuf *ring.Ring + var ws memdb.WatchSet - fetchAddrs := func() error { - // reinstantiate innerWs to prevent it from growing indefinitely - innerWs = memdb.NewWatchSet() - _, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID) + fetchAddresses := func() error { + // Re-instantiate ws since it can only be watched once. + ws = memdb.NewWatchSet() + + newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID) if err != nil { - return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err) - } - if !peering.IsActive() { - return fmt.Errorf("peer %q is no longer active", peer.ID) - } - if len(peering.PeerServerAddresses) == 0 { - return fmt.Errorf("peer %q has no addresses to dial", peer.ID) + return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err) } + ringbuf = newRing - ringbuf = ring.New(len(peering.PeerServerAddresses)) - for _, addr := range peering.PeerServerAddresses { - ringbuf.Value = addr - ringbuf = ringbuf.Next() - } return nil } + // Initialize the first ring buffer. + if err := fetchAddresses(); err != nil { + s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err) + } + for { select { case nextServerAddr <- ringbuf.Value.(string): ringbuf = ringbuf.Next() - case err := <-innerWs.WatchCh(ctx): + + case err := <-ws.WatchCh(ctx): if err != nil { - // context was cancelled + // Context was cancelled. return } - // watch fired so we refetch the peering and rebuild the ring buffer - if err := fetchAddrs(); err != nil { - s.logger.Warn("watchset for peer was fired but failed to update server addresses", - "peer_id", peer.ID, + + // Watch fired so we re-fetch the necessary addresses and replace the ring buffer. + if err := fetchAddresses(); err != nil { + s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated", + "peer_id", peerID, "error", err) } } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 5f59787764..fca4bfeaae 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -8,10 +8,12 @@ import ( "fmt" "io/ioutil" "math" + "net" "testing" "time" "github.com/armon/go-metrics" + "github.com/google/tcpproxy" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" @@ -567,7 +569,7 @@ func testLeader_PeeringSync_failsForTLSError(t *testing.T, tokenMutateFn func(to } // Since the Establish RPC dials the remote cluster, it will yield the TLS error. - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) _, err = s2Client.Establish(ctx, &establishReq) require.Contains(t, err.Error(), expectErr) @@ -1861,3 +1863,239 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) { }) }) } + +func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + ca := connect.TestCA(t, nil) + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.GRPCTLSPort = freeport.GetOne(t) + c.CAConfig = &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: structs.ConsulCAProvider, + Config: map[string]interface{}{ + "PrivateKey": ca.SigningKey, + "RootCert": ca.RootCert, + }, + } + }) + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + acceptorClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-dialer", + } + resp, err := acceptorClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, dialer.RPC, "dc2") + + // Configure peering to go through mesh gateways + store := dialer.fsm.State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // Register a gateway that isn't actually listening. + require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Port: freeport.GetOne(t), + }, + })) + + // Create a peering at dialer by establishing a peering with acceptor's token + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + dialerClient := pbpeering.NewPeeringServiceClient(conn) + + establishReq := pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: resp.PeeringToken, + } + _, err = dialerClient.Establish(ctx, &establishReq) + require.NoError(t, err) + + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) + require.NoError(t, err) + + // The peering should eventually connect because we fall back to the token's server addresses. + retry.Run(t, func(r *retry.R) { + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) +} + +func Test_Leader_PeeringSync_PeerThroughMeshGateways_Success(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + ca := connect.TestCA(t, nil) + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.GRPCTLSPort = freeport.GetOne(t) + c.CAConfig = &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: structs.ConsulCAProvider, + Config: map[string]interface{}{ + "PrivateKey": ca.SigningKey, + "RootCert": ca.RootCert, + }, + } + }) + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + acceptorClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-dialer", + } + resp, err := acceptorClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, dialer.RPC, "dc2") + + // Configure peering to go through mesh gateways + store := dialer.fsm.State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // Register a mesh gateway and a tcpproxy listening at its address. + gatewayPort := freeport.GetOne(t) + gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort) + + require.NoError(t, store.EnsureRegistration(3, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-2", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-2", + Service: "mesh-gateway", + Port: gatewayPort, + }, + })) + + // Configure a TCP proxy with an SNI route corresponding to the acceptor cluster. + var proxy tcpproxy.Proxy + target := &connWrapper{ + proxy: tcpproxy.DialProxy{ + Addr: fmt.Sprintf("127.0.0.1:%d", acceptor.config.GRPCTLSPort), + }, + } + proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target) + proxy.AddStopACMESearch(gatewayAddr) + + require.NoError(t, proxy.Start()) + t.Cleanup(func() { + proxy.Close() + proxy.Wait() + }) + + // Create a peering at dialer by establishing a peering with acceptor's token + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + dialerClient := pbpeering.NewPeeringServiceClient(conn) + + establishReq := pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: resp.PeeringToken, + } + _, err = dialerClient.Establish(ctx, &establishReq) + require.NoError(t, err) + + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) + require.NoError(t, err) + + // The peering should eventually connect through the gateway address. + retry.Run(t, func(r *retry.R) { + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + // target.called is true when the tcproxy's conn handler was invoked. + // This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway. + require.True(t, target.called) +} + +// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection. +type connWrapper struct { + proxy tcpproxy.DialProxy + called bool +} + +func (w *connWrapper) HandleConn(src net.Conn) { + w.called = true + w.proxy.HandleConn(src) +} diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 26c2f19436..d9daeaea15 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -1,12 +1,16 @@ package consul import ( + "container/ring" "encoding/base64" "encoding/json" "fmt" "strconv" "sync" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/connect" @@ -85,29 +89,114 @@ func (b *PeeringBackend) GetTLSMaterials(generatingToken bool) (string, []string return serverName, caPems, nil } -// GetServerAddresses looks up server or mesh gateway addresses from the state store. -func (b *PeeringBackend) GetServerAddresses() ([]string, error) { - _, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta()) - if err != nil { - return nil, fmt.Errorf("failed to read mesh config entry: %w", err) - } +// GetLocalServerAddresses looks up server or mesh gateway addresses from the state store for a peer to dial. +func (b *PeeringBackend) GetLocalServerAddresses() ([]string, error) { + store := b.srv.fsm.State() - meshConfig, ok := rawEntry.(*structs.MeshConfigEntry) - if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways { - return meshGatewayAdresses(b.srv.fsm.State()) + useGateways, err := b.PeerThroughMeshGateways(nil) + if err != nil { + // For inbound traffic we prefer to fail fast if we can't determine whether we should peer through MGW. + // This prevents unexpectedly sharing local server addresses when a user only intended to peer through gateways. + return nil, fmt.Errorf("failed to determine if peering should happen through mesh gateways: %w", err) } - return serverAddresses(b.srv.fsm.State()) + if useGateways { + return meshGatewayAdresses(store, nil, true) + } + return serverAddresses(store) } -func meshGatewayAdresses(state *state.Store) ([]string, error) { - _, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword) +// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, +// a boolean indicating whether mesh gateways are present, and an optional error. +// The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present. +func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) { + newRing, err := b.fetchPeerServerAddresses(ws, peerID) + if err != nil { + return nil, false, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err) + } + + gatewayRing, err := b.maybeFetchGatewayAddresses(ws) + if err != nil { + // If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses. + logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly: %w", "error", err) + return newRing, false, nil + } + if gatewayRing != nil { + // The ordering is important here. We always want to start with the mesh gateway + // addresses and fallback to the remote addresses, so we append the server addresses + // in newRing to gatewayRing. + newRing = gatewayRing.Link(newRing) + } + return newRing, gatewayRing != nil, nil +} + +// fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses. +// If the peering is no longer active or does not have addresses, then we return an error. +func (b *PeeringBackend) fetchPeerServerAddresses(ws memdb.WatchSet, peerID string) (*ring.Ring, error) { + _, peering, err := b.Store().PeeringReadByID(ws, peerID) + if err != nil { + return nil, fmt.Errorf("failed to fetch peer %q: %w", peerID, err) + } + if !peering.IsActive() { + return nil, fmt.Errorf("there is no active peering for %q", peerID) + } + return bufferFromAddresses(peering.PeerServerAddresses) +} + +// maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the +// local datacenter is configured to peer through mesh gateways and there are local gateways registered. +// If neither of these are true then we return a nil buffer. +func (b *PeeringBackend) maybeFetchGatewayAddresses(ws memdb.WatchSet) (*ring.Ring, error) { + useGateways, err := b.PeerThroughMeshGateways(ws) + if err != nil { + return nil, fmt.Errorf("failed to determine if peering should happen through mesh gateways: %w", err) + } + if useGateways { + addresses, err := meshGatewayAdresses(b.srv.fsm.State(), ws, false) + if err != nil { + return nil, fmt.Errorf("error fetching local mesh gateway addresses: %w", err) + } + return bufferFromAddresses(addresses) + } + return nil, nil +} + +// PeerThroughMeshGateways determines if the config entry to enable peering control plane +// traffic through a mesh gateway is set to enable. +func (b *PeeringBackend) PeerThroughMeshGateways(ws memdb.WatchSet) (bool, error) { + _, rawEntry, err := b.srv.fsm.State().ConfigEntry(ws, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta()) + if err != nil { + return false, fmt.Errorf("failed to read mesh config entry: %w", err) + } + mesh, ok := rawEntry.(*structs.MeshConfigEntry) + if rawEntry != nil && !ok { + return false, fmt.Errorf("got unexpected type for mesh config entry: %T", rawEntry) + } + return mesh.PeerThroughMeshGateways(), nil + +} + +func bufferFromAddresses(addresses []string) (*ring.Ring, error) { + // IMPORTANT: The address ring buffer must always be length > 0 + if len(addresses) == 0 { + return nil, fmt.Errorf("no known addresses") + } + ring := ring.New(len(addresses)) + for _, addr := range addresses { + ring.Value = addr + ring = ring.Next() + } + return ring, nil +} + +func meshGatewayAdresses(state *state.Store, ws memdb.WatchSet, wan bool) ([]string, error) { + _, nodes, err := state.ServiceDump(ws, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword) if err != nil { return nil, fmt.Errorf("failed to dump gateway addresses: %w", err) } var addrs []string for _, node := range nodes { - _, addr, port := node.BestAddress(true) + _, addr, port := node.BestAddress(wan) addrs = append(addrs, ipaddr.FormatAddressPort(addr, port)) } if len(addrs) == 0 { diff --git a/agent/consul/peering_backend_test.go b/agent/consul/peering_backend_test.go index 0d834c09a9..a962bdefc0 100644 --- a/agent/consul/peering_backend_test.go +++ b/agent/consul/peering_backend_test.go @@ -10,6 +10,7 @@ import ( gogrpc "google.golang.org/grpc" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" @@ -76,7 +77,7 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) { }) } -func TestPeeringBackend_GetServerAddresses(t *testing.T) { +func TestPeeringBackend_GetLocalServerAddresses(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -91,7 +92,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { backend := NewPeeringBackend(srv) testutil.RunStep(t, "peer to servers", func(t *testing.T) { - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.NoError(t, err) expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort) @@ -107,7 +108,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { } require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh)) - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.NoError(t, err) // Still expect server address because PeerThroughMeshGateways was not enabled. @@ -121,7 +122,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { } require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh)) - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.Nil(t, addrs) testutil.RequireErrorContains(t, err, "servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered") @@ -147,12 +148,221 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) { } require.NoError(t, srv.fsm.State().EnsureRegistration(2, ®)) - addrs, err := backend.GetServerAddresses() + addrs, err := backend.GetLocalServerAddresses() require.NoError(t, err) require.Equal(t, []string{"154.238.12.252:8443"}, addrs) }) } +func TestPeeringBackend_GetDialAddresses(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, cfg := testServerConfig(t) + cfg.GRPCTLSPort = freeport.GetOne(t) + + srv, err := newServer(t, cfg) + require.NoError(t, err) + testrpc.WaitForLeader(t, srv.RPC, "dc1") + + backend := NewPeeringBackend(srv) + + dialerPeerID := testUUID() + acceptorPeerID := testUUID() + + type expectation struct { + addrs []string + haveGateways bool + err string + } + + type testCase struct { + name string + setup func(store *state.Store) + peerID string + expect expectation + } + + run := func(t *testing.T, tc testCase) { + if tc.setup != nil { + tc.setup(srv.fsm.State()) + } + + ring, haveGateways, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID) + if tc.expect.err != "" { + testutil.RequireErrorContains(t, err, tc.expect.err) + return + } + require.Equal(t, tc.expect.haveGateways, haveGateways) + require.NotNil(t, ring) + + var addrs []string + ring.Do(func(value any) { + addr, ok := value.(string) + + require.True(t, ok) + addrs = append(addrs, addr) + }) + require.Equal(t, tc.expect.addrs, addrs) + } + + // NOTE: The following tests are set up to run serially with RunStep to save on the setup/teardown cost for a test server. + tt := []testCase{ + { + name: "unknown peering", + setup: func(store *state.Store) { + // Test peering is not written during setup + }, + peerID: acceptorPeerID, + expect: expectation{ + err: fmt.Sprintf(`there is no active peering for %q`, acceptorPeerID), + }, + }, + { + name: "no server addresses", + setup: func(store *state.Store) { + require.NoError(t, store.PeeringWrite(1, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "acceptor", + ID: acceptorPeerID, + // Acceptor peers do not have PeerServerAddresses populated locally. + }, + })) + }, + peerID: acceptorPeerID, + expect: expectation{ + err: "no known addresses", + }, + }, + { + name: "only server addrs are returned when mesh config does not exist", + setup: func(store *state.Store) { + require.NoError(t, store.PeeringWrite(2, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "dialer", + ID: dialerPeerID, + PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + })) + + // Mesh config entry does not exist + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: false, + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "only server addrs are returned when not peering through gateways", + setup: func(store *state.Store) { + require.NoError(t, srv.fsm.State().EnsureConfigEntry(3, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: false, // Peering through gateways is not enabled + }, + })) + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: false, + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "only server addrs are returned when peering through gateways without gateways registered", + setup: func(store *state.Store) { + require.NoError(t, srv.fsm.State().EnsureConfigEntry(4, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // No gateways are registered + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: false, + + // Fall back to remote server addresses + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "gateway addresses are included after gateways are registered", + setup: func(store *state.Store) { + require.NoError(t, srv.fsm.State().EnsureRegistration(5, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-1", + Address: "5.6.7.8", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Port: 8443, + TaggedAddresses: map[string]structs.ServiceAddress{ + structs.TaggedAddressWAN: { + Address: "my-lb-addr.not-aws.com", + Port: 443, + }, + }, + }, + })) + require.NoError(t, srv.fsm.State().EnsureRegistration(6, &structs.RegisterRequest{ + ID: types.NodeID(testUUID()), + Node: "gateway-node-2", + Address: "6.7.8.9", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-2", + Service: "mesh-gateway", + Port: 8443, + TaggedAddresses: map[string]structs.ServiceAddress{ + structs.TaggedAddressWAN: { + Address: "my-other-lb-addr.not-aws.com", + Port: 443, + }, + }, + }, + })) + }, + peerID: dialerPeerID, + expect: expectation{ + haveGateways: true, + + // Gateways come first, and we use their LAN addresses since this is for outbound communication. + addrs: []string{"6.7.8.9:8443", "5.6.7.8:8443", "1.2.3.4:8502", "2.3.4.5:8503"}, + }, + }, + { + name: "addresses are not returned if the peering is deleted", + setup: func(store *state.Store) { + require.NoError(t, store.PeeringWrite(5, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "dialer", + ID: dialerPeerID, + PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + + // Mark as deleted + State: pbpeering.PeeringState_DELETING, + DeletedAt: structs.TimeToProto(time.Now()), + }, + })) + }, + peerID: dialerPeerID, + expect: expectation{ + err: fmt.Sprintf(`there is no active peering for %q`, dialerPeerID), + }, + }, + } + + for _, tc := range tt { + testutil.RunStep(t, tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) { return func(ctx context.Context, addr string) (net.Conn, error) { d := net.Dialer{} diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 281156a222..bf9d1b791a 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -709,7 +709,7 @@ func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) { return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config") case cfg == nil: logger.Warn("cannot begin stream because Connect CA is not yet initialized") - return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized") + return "", grpcstatus.Error(codes.Unavailable, "Connect CA is not yet initialized") } return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil } diff --git a/agent/proxycfg/mesh_gateway.go b/agent/proxycfg/mesh_gateway.go index 1378f9b9bf..63747a9404 100644 --- a/agent/proxycfg/mesh_gateway.go +++ b/agent/proxycfg/mesh_gateway.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/go-hclog" cachetype "github.com/hashicorp/consul/agent/cache-types" @@ -564,7 +565,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn peeringListCtx, cancel := context.WithCancel(ctx) err := s.dataSources.PeeringList.Notify(peeringListCtx, &cachetype.PeeringListRequest{ Request: &pbpeering.PeeringListRequest{ - Partition: structs.WildcardSpecifier, + Partition: acl.WildcardPartitionName, }, }, peerServersWatchID, s.ch) if err != nil { diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index eb41aaaf98..9d884e3014 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -247,7 +247,7 @@ func genVerifyPeeringListWatchForMeshGateway() verifyWatchRequest { return func(t testing.TB, request any) { reqReal, ok := request.(*cachetype.PeeringListRequest) require.True(t, ok) - require.Equal(t, structs.WildcardSpecifier, reqReal.Request.Partition) + require.Equal(t, acl.WildcardPartitionName, reqReal.Request.Partition) } } diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index c5abb3d9af..c6eb78dea9 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -1,6 +1,7 @@ package peering import ( + "container/ring" "context" "errors" "fmt" @@ -8,6 +9,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -36,6 +38,15 @@ var ( errPeeringTokenEmptyPeerID = errors.New("peering token peer ID value is empty") ) +const ( + // meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway. + // This wait provides some time for the first gateway address to configure a route to the peer servers. + // Why 350ms? That is roughly the p50 latency we observed in a scale test for proxy config propagation: + // https://www.hashicorp.com/cgsb + meshGatewayWait = 350 * time.Millisecond + establishmentTimeout = 5 * time.Second +) + // errPeeringInvalidServerAddress is returned when an establish request contains // an invalid server address. type errPeeringInvalidServerAddress struct { @@ -118,9 +129,9 @@ type Backend interface { // It returns the server name to validate, and the CA certificate to validate with. GetTLSMaterials(generatingToken bool) (string, []string, error) - // GetServerAddresses returns the addresses used for establishing a peering connection. + // GetLocalServerAddresses returns the addresses used for establishing a peering connection. // These may be server addresses or mesh gateway addresses if peering through mesh gateways. - GetServerAddresses() ([]string, error) + GetLocalServerAddresses() ([]string, error) // EncodeToken packages a peering token into a slice of bytes. EncodeToken(tok *structs.PeeringToken) ([]byte, error) @@ -128,6 +139,12 @@ type Backend interface { // DecodeToken unpackages a peering token from a slice of bytes. DecodeToken([]byte) (*structs.PeeringToken, error) + // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, + // a boolean indicating whether mesh gateways are present, and an optional error. + // The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local + // datacenter is configured to dial through mesh gateways. + GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) + EnterpriseCheckPartitions(partition string) error EnterpriseCheckNamespaces(namespace string) error @@ -298,7 +315,7 @@ func (s *Server) GenerateToken( if len(req.ServerExternalAddresses) > 0 { serverAddrs = req.ServerExternalAddresses } else { - serverAddrs, err = s.Backend.GetServerAddresses() + serverAddrs, err = s.Backend.GetLocalServerAddresses() if err != nil { return nil, err } @@ -419,7 +436,13 @@ func (s *Server) Establish( PeerServerName: tok.ServerName, PeerID: tok.PeerID, Meta: req.Meta, - State: pbpeering.PeeringState_ESTABLISHING, + + // State is intentionally not set until after the secret exchange succeeds. + // This is to prevent a scenario where an active peering is re-established, + // the secret exchange fails, and the peering state gets stuck in "Establishing" + // while the original connection is still active. + // State: pbpeering.PeeringState_ESTABLISHING, + // PartitionOrEmpty is used to avoid writing "default" in OSS. Partition: entMeta.PartitionOrEmpty(), Remote: &pbpeering.RemoteInfo{ @@ -428,39 +451,30 @@ func (s *Server) Establish( }, } - tlsOption, err := peering.TLSDialOption() - if err != nil { - return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) + // Write the peering ahead of the ExchangeSecret handshake to give + // mesh gateways in the default partition an opportunity + // to update their config with an outbound route to this peer server. + // + // If the request to exchange a secret fails then the peering will continue to exist. + // We do not undo this write because this call to establish may actually be a re-establish call + // for an active peering. + writeReq := &pbpeering.PeeringWriteRequest{ + Peering: peering, + } + if err := s.Backend.PeeringWrite(writeReq); err != nil { + return nil, fmt.Errorf("failed to write peering: %w", err) } - exchangeReq := pbpeerstream.ExchangeSecretRequest{ - PeerID: peering.PeerID, - EstablishmentSecret: tok.EstablishmentSecret, - } - var exchangeResp *pbpeerstream.ExchangeSecretResponse - - // Loop through the known server addresses once, attempting to fetch the long-lived stream secret. - var dialErrors error - for _, addr := range serverAddrs { - exchangeResp, err = exchangeSecret(ctx, addr, tlsOption, &exchangeReq) - if err != nil { - dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret with %q: %w", addr, err)) - } - if exchangeResp != nil { - break - } - } + exchangeResp, dialErrors := s.exchangeSecret(ctx, peering, tok.EstablishmentSecret) if exchangeResp == nil { return nil, dialErrors } + peering.State = pbpeering.PeeringState_ESTABLISHING // As soon as a peering is written with a non-empty list of ServerAddresses // and an active stream secret, a leader routine will see the peering and // attempt to establish a peering stream with the remote peer. - // - // This peer now has a record of both the LocalPeerID(ID) and - // RemotePeerID(PeerID) but at this point the other peer does not. - writeReq := &pbpeering.PeeringWriteRequest{ + writeReq = &pbpeering.PeeringWriteRequest{ Peering: peering, SecretsRequest: &pbpeering.SecretsWriteRequest{ PeerID: peering.ID, @@ -474,7 +488,6 @@ func (s *Server) Establish( if err := s.Backend.PeeringWrite(writeReq); err != nil { return nil, fmt.Errorf("failed to write peering: %w", err) } - // TODO(peering): low prio: consider adding response details return resp, nil } @@ -493,20 +506,78 @@ func (s *Server) validatePeeringLocality(token *structs.PeeringToken) error { return nil } -func exchangeSecret(ctx context.Context, addr string, tlsOption grpc.DialOption, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error) { - dialCtx, cancel := context.WithTimeout(ctx, 10*time.Second) +// exchangeSecret will continuously attempt to exchange the given establishment secret with the peer, up to a timeout. +// This function will attempt to dial through mesh gateways if the local DC is configured to peer through gateways, +// but will fall back to server addresses if not. +func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering, establishmentSecret string) (*pbpeerstream.ExchangeSecretResponse, error) { + req := pbpeerstream.ExchangeSecretRequest{ + PeerID: peering.PeerID, + EstablishmentSecret: establishmentSecret, + } + + tlsOption, err := peering.TLSDialOption() + if err != nil { + return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) + } + + ringBuf, usingGateways, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID) + if err != nil { + return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err) + } + + var ( + resp *pbpeerstream.ExchangeSecretResponse + dialErrors error + ) + + retryWait := 150 * time.Millisecond + jitter := retry.NewJitter(25) + + if usingGateways { + // If we are dialing through local gateways we sleep before issuing the first request. + // This gives the local gateways some time to configure a route to the peer servers. + time.Sleep(meshGatewayWait) + } + + retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout) defer cancel() - conn, err := grpc.DialContext(dialCtx, addr, - tlsOption, - ) - if err != nil { - return nil, fmt.Errorf("failed to dial peer: %w", err) - } - defer conn.Close() + for retryCtx.Err() == nil { + addr := ringBuf.Value.(string) - client := pbpeerstream.NewPeerStreamServiceClient(conn) - return client.ExchangeSecret(ctx, req) + dialCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + conn, err := grpc.DialContext(dialCtx, addr, + tlsOption, + ) + if err != nil { + return nil, fmt.Errorf("failed to dial peer: %w", err) + } + defer conn.Close() + + client := pbpeerstream.NewPeerStreamServiceClient(conn) + resp, err = client.ExchangeSecret(ctx, &req) + + // If we got a permission denied error that means out establishment secret is invalid, so we do not retry. + grpcErr, ok := grpcstatus.FromError(err) + if ok && grpcErr.Code() == codes.PermissionDenied { + return nil, fmt.Errorf("a new peering token must be generated: %w", grpcErr.Err()) + } + if err != nil { + dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret through address %q: %w", addr, err)) + } + if resp != nil { + // Got a valid response. We're done. + break + } + + time.Sleep(jitter(retryWait)) + + // Cycle to the next possible address. + ringBuf = ringBuf.Next() + } + return resp, dialErrors } // OPTIMIZE: Handle blocking queries diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 300e463bee..f2db059e75 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/google/tcpproxy" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -476,6 +477,188 @@ func TestPeeringService_Establish(t *testing.T) { }) } +func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { + // This test is timing-sensitive, must not be run in parallel. + // t.Parallel() + + acceptor := newTestServer(t, func(conf *consul.Config) { + conf.NodeName = "acceptor" + }) + acceptorClient := pbpeering.NewPeeringServiceClient(acceptor.ClientConn(t)) + + dialer := newTestServer(t, func(conf *consul.Config) { + conf.NodeName = "dialer" + conf.Datacenter = "dc2" + conf.PrimaryDatacenter = "dc2" + }) + dialerClient := pbpeering.NewPeeringServiceClient(dialer.ClientConn(t)) + + var peeringToken string + + testutil.RunStep(t, "retry until timeout on dial errors", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + testToken := structs.PeeringToken{ + ServerAddresses: []string{fmt.Sprintf("127.0.0.1:%d", freeport.GetOne(t))}, + PeerID: testUUID(t), + } + testTokenJSON, _ := json.Marshal(&testToken) + testTokenB64 := base64.StdEncoding.EncodeToString(testTokenJSON) + + start := time.Now() + _, err := dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: testTokenB64, + }) + require.Error(t, err) + testutil.RequireErrorContains(t, err, "connection refused") + + require.Greater(t, time.Since(start), 5*time.Second) + }) + + testutil.RunStep(t, "peering can be established from token", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + // Generate a peering token for dialer + tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"}) + require.NoError(t, err) + + // Capture peering token for re-use later + peeringToken = tokenResp.PeeringToken + + // The context timeout is short, it checks that we do not wait the 350ms that we do when peering through mesh gateways + ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond) + t.Cleanup(cancel) + + _, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: tokenResp.PeeringToken, + }) + require.NoError(t, err) + }) + + testutil.RunStep(t, "fail fast on permission denied", func(t *testing.T) { + // This test case re-uses the previous token since the establishment secret will have been invalidated. + // The context timeout is short, it checks that we do not retry. + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + t.Cleanup(cancel) + + _, err := dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: peeringToken, + }) + testutil.RequireErrorContains(t, err, "a new peering token must be generated") + }) + + gatewayPort := freeport.GetOne(t) + + testutil.RunStep(t, "fail past bad mesh gateway", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + // Generate a new peering token for the dialer. + tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"}) + require.NoError(t, err) + + store := dialer.Server.FSM().State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + // Register a gateway that isn't actually listening. + require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{ + ID: types.NodeID(testUUID(t)), + Node: "gateway-node-1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mesh-gateway-1", + Service: "mesh-gateway", + Port: gatewayPort, + }, + })) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + // Call to establish should succeed when we fall back to remote server address. + _, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: tokenResp.PeeringToken, + }) + require.NoError(t, err) + }) + + testutil.RunStep(t, "route through gateway", func(t *testing.T) { + // Spin up a proxy listening at the gateway port registered above. + gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort) + + // Configure a TCP proxy with an SNI route corresponding to the acceptor cluster. + var proxy tcpproxy.Proxy + target := &connWrapper{ + proxy: tcpproxy.DialProxy{ + Addr: acceptor.PublicGRPCAddr, + }, + } + proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target) + proxy.AddStopACMESearch(gatewayAddr) + + require.NoError(t, proxy.Start()) + t.Cleanup(func() { + proxy.Close() + proxy.Wait() + }) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + // Generate a new peering token for the dialer. + tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"}) + require.NoError(t, err) + + store := dialer.Server.FSM().State() + require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{ + Peering: &structs.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + })) + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(cancel) + + start := time.Now() + + // Call to establish should succeed through the proxy. + _, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{ + PeerName: "my-peer-acceptor", + PeeringToken: tokenResp.PeeringToken, + }) + require.NoError(t, err) + + // Dialing through a gateway is preceded by a mandatory 350ms sleep. + require.Greater(t, time.Since(start), 350*time.Millisecond) + + // target.called is true when the tcproxy's conn handler was invoked. + // This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway. + require.True(t, target.called) + }) +} + +// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection. +type connWrapper struct { + proxy tcpproxy.DialProxy + called bool +} + +func (w *connWrapper) HandleConn(src net.Conn) { + w.called = true + w.proxy.HandleConn(src) +} + func TestPeeringService_Establish_ACLEnforcement(t *testing.T) { validToken := peering.TestPeeringToken("83474a06-cca4-4ff4-99a4-4152929c8160") validTokenJSON, _ := json.Marshal(&validToken) diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl new file mode 100644 index 0000000000..f81ab0edd6 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/base.hcl @@ -0,0 +1,5 @@ +primary_datacenter = "alpha" +log_level = "trace" +peering { + enabled = true +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl new file mode 100644 index 0000000000..996df1d213 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/config_entries.hcl @@ -0,0 +1,32 @@ +config_entries { + bootstrap = [ + { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "tcp" + } + }, + { + kind = "mesh" + peering { + peer_through_mesh_gateways = true + } + }, + { + kind = "exported-services" + name = "default" + services = [ + { + name = "s2" + consumers = [ + { + peer = "alpha-to-primary" + } + ] + } + ] + } + ] +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl new file mode 100644 index 0000000000..bcdcb2e8b3 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_gateway.hcl @@ -0,0 +1,5 @@ +services { + name = "mesh-gateway" + kind = "mesh-gateway" + port = 4432 +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl new file mode 100644 index 0000000000..e97ec23666 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s1.hcl @@ -0,0 +1 @@ +# We don't want an s1 service in this peer diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl new file mode 100644 index 0000000000..01d4505c67 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/service_s2.hcl @@ -0,0 +1,7 @@ +services { + name = "s2" + port = 8181 + connect { + sidecar_service {} + } +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh new file mode 100644 index 0000000000..6d341b20a2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/setup.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -euo pipefail + +register_services alpha + +gen_envoy_bootstrap s2 19002 alpha +gen_envoy_bootstrap mesh-gateway 19003 alpha true + +wait_for_config_entry proxy-defaults global alpha +wait_for_config_entry exported-services default alpha +wait_for_config_entry mesh mesh alpha diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats new file mode 100644 index 0000000000..06314df903 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/alpha/verify.bats @@ -0,0 +1,49 @@ +#!/usr/bin/env bats + +load helpers + +@test "s2 proxy is running correct version" { + assert_envoy_version 19002 +} + +@test "s2 proxy admin is up on :19002" { + retry_default curl -f -s localhost:19002/stats -o /dev/null +} + +@test "gateway-alpha proxy admin is up on :19003" { + retry_default curl -f -s localhost:19003/stats -o /dev/null +} + +@test "s2 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s2 alpha +} + +@test "s2 proxy should be healthy" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "gateway-alpha should be up and listening" { + retry_long nc -z consul-alpha-client:4432 +} + +@test "s2 proxies should be healthy" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "dialer gateway-alpha should have healthy endpoints for alpha servers" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19003 server.alpha.peering HEALTHY 1 +} + +@test "dialer gateway-alpha should have healthy endpoints for primary servers" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19003 server.primary.peering HEALTHY 1 +} + +# Re-peering the clusters is a way to have alpha dial out through its own gateway +# since we know it is configured with endpoints for primary from the first time they peered. +@test "re-peer the two clusters together" { + create_peering primary alpha +} + +@test "alpha servers made connection to primary servers via alpha gateway" { + assert_envoy_metric_at_least 127.0.0.1:19003 "cluster.server.primary.peering.*cx_total" 1 +} \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl new file mode 100644 index 0000000000..f54393f03e --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/bind.hcl @@ -0,0 +1,2 @@ +bind_addr = "0.0.0.0" +advertise_addr = "{{ GetInterfaceIP \"eth0\" }}" \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh new file mode 100644 index 0000000000..f7205ae515 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/capture.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +snapshot_envoy_admin localhost:19001 mesh-gateway primary || true +snapshot_envoy_admin localhost:19003 mesh-gateway alpha || true diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl new file mode 100644 index 0000000000..c1e134d5a2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/base.hcl @@ -0,0 +1,3 @@ +peering { + enabled = true +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl new file mode 100644 index 0000000000..6baeb569f2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/config_entries.hcl @@ -0,0 +1,18 @@ +config_entries { + bootstrap = [ + { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "tcp" + } + }, + { + kind = "mesh" + peering { + peer_through_mesh_gateways = true + } + } + ] +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl new file mode 100644 index 0000000000..831a70ff32 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_gateway.hcl @@ -0,0 +1,5 @@ +services { + name = "mesh-gateway" + kind = "mesh-gateway" + port = 4431 +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl new file mode 100644 index 0000000000..af0773c29a --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s1.hcl @@ -0,0 +1,20 @@ +services { + name = "s1" + port = 8080 + connect { + sidecar_service { + proxy { + upstreams = [ + { + destination_name = "s2" + destination_peer = "primary-to-alpha" + local_bind_port = 5000 + mesh_gateway { + mode = "local" + } + } + ] + } + } + } +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl new file mode 100644 index 0000000000..77164e722b --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/service_s2.hcl @@ -0,0 +1 @@ +# We don't want an s2 service in the primary dc \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh new file mode 100644 index 0000000000..3aa37f8cb0 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/setup.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -euo pipefail + +register_services primary + +gen_envoy_bootstrap s1 19000 primary +gen_envoy_bootstrap mesh-gateway 19001 primary true + +wait_for_config_entry proxy-defaults global +wait_for_config_entry mesh mesh alpha diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats new file mode 100644 index 0000000000..bc21d5ee3f --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/primary/verify.bats @@ -0,0 +1,69 @@ +#!/usr/bin/env bats + +load helpers + +@test "s1 proxy is running correct version" { + assert_envoy_version 19000 +} + +@test "s1 proxy admin is up on :19000" { + retry_default curl -f -s localhost:19000/stats -o /dev/null +} + +@test "gateway-primary proxy admin is up on :19001" { + retry_default curl -f -s localhost:19001/stats -o /dev/null +} + +@test "s1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s1 +} + +@test "s2 proxies should be healthy in alpha" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "gateway-primary should be up and listening" { + retry_long nc -z consul-primary-client:4431 +} + +@test "gateway-alpha should be up and listening" { + retry_long nc -z consul-alpha-client:4432 +} + +@test "peer the two clusters together" { + create_peering primary alpha +} + +@test "acceptor gateway-primary should have healthy endpoints for primary servers" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19001 server.primary.peering HEALTHY 1 +} + +@test "alpha servers made connection to primary servers via primary gateway" { + assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.server.primary.peering.*cx_total" 1 +} + +@test "s2 alpha proxies should be healthy in primary" { + assert_service_has_healthy_instances s2 1 primary "" "" primary-to-alpha +} + +@test "gateway-alpha should have healthy endpoints for s2" { + assert_upstream_has_endpoints_in_status consul-alpha-client:19003 exported~s2.default.alpha HEALTHY 1 +} + +@test "s1 upstream should have healthy endpoints for s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2.default.primary-to-alpha.external HEALTHY 1 +} + +@test "s1 upstream should be able to connect to s2" { + run retry_default curl -s -f -d hello localhost:5000 + [ "$status" -eq 0 ] + [ "$output" = "hello" ] +} + +@test "s1 upstream made 1 connection to s2" { + assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.s2.default.primary-to-alpha.external.*cx_total" 1 +} + +@test "s1 upstream made 1 connection to s2 through the primary mesh gateway" { + assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.s2.default.default.alpha-to-primary.external.*cx_total" 1 +} diff --git a/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh new file mode 100644 index 0000000000..c856de1733 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peer-control-plane-mgw/vars.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +export REQUIRED_SERVICES="s1 s1-sidecar-proxy gateway-primary s2-alpha s2-sidecar-proxy-alpha gateway-alpha" +export REQUIRE_PEERS=1