Merge pull request #14981 from hashicorp/peering/dial-through-gateways

This commit is contained in:
Freddy 2022-10-14 09:44:56 -06:00 committed by GitHub
commit 24d0c8801a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1177 additions and 125 deletions

3
.changelog/14981.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
peering: add support for routine peering control-plane traffic through mesh gateways
```

View File

@ -4,7 +4,8 @@
package acl package acl
const ( const (
DefaultPartitionName = "" WildcardPartitionName = ""
DefaultPartitionName = ""
) )
// Reviewer Note: This is a little bit strange; one might want it to be "" like partition name // Reviewer Note: This is a little bit strange; one might want it to be "" like partition name

View File

@ -243,11 +243,22 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
continue continue
} }
status, found := s.peerStreamServer.StreamStatus(peer.ID) // We may have written this peering to the store to trigger xDS updates, but still in the process of establishing.
// If there isn't a secret yet, we're still trying to reach the other server.
logger.Trace("reading peering secret", "sequence_id", seq)
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
continue
}
// TODO(peering): If there is new peering data and a connected stream, should we tear down the stream? status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found && status.Connected { if found && status.Connected {
// Nothing to do when we already have an active stream to the peer. // Nothing to do when we already have an active stream to the peer.
// Updated data will only be used if the stream becomes disconnected
// since there's no need to tear down an active stream.
continue continue
} }
logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq) logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq)
@ -259,7 +270,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
cancel() cancel()
} }
if err := s.establishStream(ctx, logger, ws, peer, cancelFns); err != nil { if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil {
// TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs. // TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs.
// Lockable status isn't available here though. Could report it via the peering.Service? // Lockable status isn't available here though. Could report it via the peering.Service?
logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err) logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err)
@ -273,7 +284,6 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq) logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq)
// Clean up active streams of peerings that were deleted from the state store. // Clean up active streams of peerings that were deleted from the state store.
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
for stream, doneCh := range connectedStreams { for stream, doneCh := range connectedStreams {
if _, ok := stored[stream]; ok { if _, ok := stored[stream]; ok {
// Active stream is in the state store, nothing to do. // Active stream is in the state store, nothing to do.
@ -298,7 +308,11 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
return merr.ErrorOrNil() return merr.ErrorOrNil()
} }
func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws memdb.WatchSet, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error { func (s *Server) establishStream(ctx context.Context,
logger hclog.Logger,
peer *pbpeering.Peering,
secret *pbpeering.PeeringSecrets,
cancelFns map[string]context.CancelFunc) error {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID) logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)
if peer.PeerID == "" { if peer.PeerID == "" {
@ -310,10 +324,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
return fmt.Errorf("failed to build TLS dial option from peering: %w", err) return fmt.Errorf("failed to build TLS dial option from peering: %w", err)
} }
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" { if secret.GetStream().GetActiveSecretID() == "" {
return errors.New("missing stream secret for peering stream authorization, peering must be re-established") return errors.New("missing stream secret for peering stream authorization, peering must be re-established")
} }
@ -331,15 +341,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
// Start a goroutine to watch for updates to peer server addresses. // Start a goroutine to watch for updates to peer server addresses.
// The latest valid server address can be received from nextServerAddr. // The latest valid server address can be received from nextServerAddr.
nextServerAddr := make(chan string) nextServerAddr := make(chan string)
go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr) go s.watchAddresses(streamCtx, peer.ID, nextServerAddr)
// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoffPeering(streamCtx, logger, func() error { go retryLoopBackoffPeering(streamCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors. // Try a new address on each iteration by advancing the ring buffer on errors.
addr := <-nextServerAddr addr, stillOpen := <-nextServerAddr
if !stillOpen {
// If the channel was closed that means the context was canceled, so we return.
return streamCtx.Err()
}
opts := []grpc.DialOption{ opts := []grpc.DialOption{
tlsOption, tlsOption,
// TODO(peering): Use a grpc.WithStatsHandler here.
// This should wait until the grpc-external server is wired up with a stats handler in NET-50.
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that. // For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, Time: 30 * time.Second,
@ -349,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}), }),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)),
} }
// TODO(peering): use a grpc.WithStatsHandler here?)
logger.Trace("dialing peer", "addr", addr) logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr, opts...) conn, err := grpc.DialContext(streamCtx, addr, opts...)
@ -400,83 +416,75 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}, func(err error) { }, func(err error) {
// TODO(peering): why are we using TrackSendError here? This could also be a receive error. // TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error()) streamStatus.TrackSendError(err.Error())
if isErrCode(err, codes.FailedPrecondition) {
switch {
case isErrCode(err, codes.FailedPrecondition):
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err) "error", err)
return
} else if isErrCode(err, codes.ResourceExhausted) { case isErrCode(err, codes.ResourceExhausted):
logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting", logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting",
"error", err) "error", err)
return
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
logger.Debug("stream context was canceled", "error", err)
case err != nil:
logger.Error("error managing peering stream", "error", err)
} }
logger.Error("error managing peering stream", "error", err)
}, peeringRetryTimeout) }, peeringRetryTimeout)
return nil return nil
} }
// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr. // watchAddresses sends an up-to-date address to nextServerAddr.
// It loads the server addresses into a ring buffer and cycles through them until: // These could be either remote peer server addresses, or local mesh gateways.
// 1. streamCtx is cancelled (peer is deleted) // The function loads the addresses into a ring buffer and cycles through them until:
// 2. the peer is modified and the watchset fires. // 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data)
// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires.
// //
// In case (2) we refetch the peering and rebuild the ring buffer. // In case (2) we re-fetch all the data sources and rebuild the ring buffer.
func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) { // In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with
// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses.
func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) {
defer close(nextServerAddr) defer close(nextServerAddr)
// we initialize the ring buffer with the peer passed to `establishStream` var ringbuf *ring.Ring
// because the caller has pre-checked `peer.ShouldDial`, guaranteeing var ws memdb.WatchSet
// at least one server address.
//
// IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block.
ringbuf := ring.New(len(peer.PeerServerAddresses))
for _, addr := range peer.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
innerWs := memdb.NewWatchSet()
_, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.",
"peer_id", peer.ID,
"error", err)
}
fetchAddrs := func() error { fetchAddresses := func() error {
// reinstantiate innerWs to prevent it from growing indefinitely // Re-instantiate ws since it can only be watched once.
innerWs = memdb.NewWatchSet() ws = memdb.NewWatchSet()
_, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err) return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err)
}
if !peering.IsActive() {
return fmt.Errorf("peer %q is no longer active", peer.ID)
}
if len(peering.PeerServerAddresses) == 0 {
return fmt.Errorf("peer %q has no addresses to dial", peer.ID)
} }
ringbuf = newRing
ringbuf = ring.New(len(peering.PeerServerAddresses))
for _, addr := range peering.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
return nil return nil
} }
// Initialize the first ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err)
}
for { for {
select { select {
case nextServerAddr <- ringbuf.Value.(string): case nextServerAddr <- ringbuf.Value.(string):
ringbuf = ringbuf.Next() ringbuf = ringbuf.Next()
case err := <-innerWs.WatchCh(ctx):
case err := <-ws.WatchCh(ctx):
if err != nil { if err != nil {
// context was cancelled // Context was cancelled.
return return
} }
// watch fired so we refetch the peering and rebuild the ring buffer
if err := fetchAddrs(); err != nil { // Watch fired so we re-fetch the necessary addresses and replace the ring buffer.
s.logger.Warn("watchset for peer was fired but failed to update server addresses", if err := fetchAddresses(); err != nil {
"peer_id", peer.ID, s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated",
"peer_id", peerID,
"error", err) "error", err)
} }
} }

View File

@ -8,10 +8,12 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math" "math"
"net"
"testing" "testing"
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/google/tcpproxy"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -567,7 +569,7 @@ func testLeader_PeeringSync_failsForTLSError(t *testing.T, tokenMutateFn func(to
} }
// Since the Establish RPC dials the remote cluster, it will yield the TLS error. // Since the Establish RPC dials the remote cluster, it will yield the TLS error.
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
_, err = s2Client.Establish(ctx, &establishReq) _, err = s2Client.Establish(ctx, &establishReq)
require.Contains(t, err.Error(), expectErr) require.Contains(t, err.Error(), expectErr)
@ -1861,3 +1863,239 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {
}) })
}) })
} }
func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
ca := connect.TestCA(t, nil)
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCTLSPort = freeport.GetOne(t)
c.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
},
}
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
acceptorClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Configure peering to go through mesh gateways
store := dialer.fsm.State()
require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
// Register a gateway that isn't actually listening.
require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{
ID: types.NodeID(testUUID()),
Node: "gateway-node-1",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-1",
Service: "mesh-gateway",
Port: freeport.GetOne(t),
},
}))
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
// The peering should eventually connect because we fall back to the token's server addresses.
retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
}
func Test_Leader_PeeringSync_PeerThroughMeshGateways_Success(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
ca := connect.TestCA(t, nil)
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCTLSPort = freeport.GetOne(t)
c.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
},
}
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
acceptorClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Configure peering to go through mesh gateways
store := dialer.fsm.State()
require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
// Register a mesh gateway and a tcpproxy listening at its address.
gatewayPort := freeport.GetOne(t)
gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort)
require.NoError(t, store.EnsureRegistration(3, &structs.RegisterRequest{
ID: types.NodeID(testUUID()),
Node: "gateway-node-2",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-2",
Service: "mesh-gateway",
Port: gatewayPort,
},
}))
// Configure a TCP proxy with an SNI route corresponding to the acceptor cluster.
var proxy tcpproxy.Proxy
target := &connWrapper{
proxy: tcpproxy.DialProxy{
Addr: fmt.Sprintf("127.0.0.1:%d", acceptor.config.GRPCTLSPort),
},
}
proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target)
proxy.AddStopACMESearch(gatewayAddr)
require.NoError(t, proxy.Start())
t.Cleanup(func() {
proxy.Close()
proxy.Wait()
})
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
// The peering should eventually connect through the gateway address.
retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
// target.called is true when the tcproxy's conn handler was invoked.
// This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway.
require.True(t, target.called)
}
// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection.
type connWrapper struct {
proxy tcpproxy.DialProxy
called bool
}
func (w *connWrapper) HandleConn(src net.Conn) {
w.called = true
w.proxy.HandleConn(src)
}

View File

@ -1,12 +1,16 @@
package consul package consul
import ( import (
"container/ring"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strconv" "strconv"
"sync" "sync"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
@ -85,29 +89,114 @@ func (b *PeeringBackend) GetTLSMaterials(generatingToken bool) (string, []string
return serverName, caPems, nil return serverName, caPems, nil
} }
// GetServerAddresses looks up server or mesh gateway addresses from the state store. // GetLocalServerAddresses looks up server or mesh gateway addresses from the state store for a peer to dial.
func (b *PeeringBackend) GetServerAddresses() ([]string, error) { func (b *PeeringBackend) GetLocalServerAddresses() ([]string, error) {
_, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta()) store := b.srv.fsm.State()
if err != nil {
return nil, fmt.Errorf("failed to read mesh config entry: %w", err)
}
meshConfig, ok := rawEntry.(*structs.MeshConfigEntry) useGateways, err := b.PeerThroughMeshGateways(nil)
if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways { if err != nil {
return meshGatewayAdresses(b.srv.fsm.State()) // For inbound traffic we prefer to fail fast if we can't determine whether we should peer through MGW.
// This prevents unexpectedly sharing local server addresses when a user only intended to peer through gateways.
return nil, fmt.Errorf("failed to determine if peering should happen through mesh gateways: %w", err)
} }
return serverAddresses(b.srv.fsm.State()) if useGateways {
return meshGatewayAdresses(store, nil, true)
}
return serverAddresses(store)
} }
func meshGatewayAdresses(state *state.Store) ([]string, error) { // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers,
_, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword) // a boolean indicating whether mesh gateways are present, and an optional error.
// The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present.
func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) {
newRing, err := b.fetchPeerServerAddresses(ws, peerID)
if err != nil {
return nil, false, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err)
}
gatewayRing, err := b.maybeFetchGatewayAddresses(ws)
if err != nil {
// If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses.
logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly: %w", "error", err)
return newRing, false, nil
}
if gatewayRing != nil {
// The ordering is important here. We always want to start with the mesh gateway
// addresses and fallback to the remote addresses, so we append the server addresses
// in newRing to gatewayRing.
newRing = gatewayRing.Link(newRing)
}
return newRing, gatewayRing != nil, nil
}
// fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses.
// If the peering is no longer active or does not have addresses, then we return an error.
func (b *PeeringBackend) fetchPeerServerAddresses(ws memdb.WatchSet, peerID string) (*ring.Ring, error) {
_, peering, err := b.Store().PeeringReadByID(ws, peerID)
if err != nil {
return nil, fmt.Errorf("failed to fetch peer %q: %w", peerID, err)
}
if !peering.IsActive() {
return nil, fmt.Errorf("there is no active peering for %q", peerID)
}
return bufferFromAddresses(peering.PeerServerAddresses)
}
// maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the
// local datacenter is configured to peer through mesh gateways and there are local gateways registered.
// If neither of these are true then we return a nil buffer.
func (b *PeeringBackend) maybeFetchGatewayAddresses(ws memdb.WatchSet) (*ring.Ring, error) {
useGateways, err := b.PeerThroughMeshGateways(ws)
if err != nil {
return nil, fmt.Errorf("failed to determine if peering should happen through mesh gateways: %w", err)
}
if useGateways {
addresses, err := meshGatewayAdresses(b.srv.fsm.State(), ws, false)
if err != nil {
return nil, fmt.Errorf("error fetching local mesh gateway addresses: %w", err)
}
return bufferFromAddresses(addresses)
}
return nil, nil
}
// PeerThroughMeshGateways determines if the config entry to enable peering control plane
// traffic through a mesh gateway is set to enable.
func (b *PeeringBackend) PeerThroughMeshGateways(ws memdb.WatchSet) (bool, error) {
_, rawEntry, err := b.srv.fsm.State().ConfigEntry(ws, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
if err != nil {
return false, fmt.Errorf("failed to read mesh config entry: %w", err)
}
mesh, ok := rawEntry.(*structs.MeshConfigEntry)
if rawEntry != nil && !ok {
return false, fmt.Errorf("got unexpected type for mesh config entry: %T", rawEntry)
}
return mesh.PeerThroughMeshGateways(), nil
}
func bufferFromAddresses(addresses []string) (*ring.Ring, error) {
// IMPORTANT: The address ring buffer must always be length > 0
if len(addresses) == 0 {
return nil, fmt.Errorf("no known addresses")
}
ring := ring.New(len(addresses))
for _, addr := range addresses {
ring.Value = addr
ring = ring.Next()
}
return ring, nil
}
func meshGatewayAdresses(state *state.Store, ws memdb.WatchSet, wan bool) ([]string, error) {
_, nodes, err := state.ServiceDump(ws, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to dump gateway addresses: %w", err) return nil, fmt.Errorf("failed to dump gateway addresses: %w", err)
} }
var addrs []string var addrs []string
for _, node := range nodes { for _, node := range nodes {
_, addr, port := node.BestAddress(true) _, addr, port := node.BestAddress(wan)
addrs = append(addrs, ipaddr.FormatAddressPort(addr, port)) addrs = append(addrs, ipaddr.FormatAddressPort(addr, port))
} }
if len(addrs) == 0 { if len(addrs) == 0 {

View File

@ -10,6 +10,7 @@ import (
gogrpc "google.golang.org/grpc" gogrpc "google.golang.org/grpc"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
@ -76,7 +77,7 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) {
}) })
} }
func TestPeeringBackend_GetServerAddresses(t *testing.T) { func TestPeeringBackend_GetLocalServerAddresses(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -91,7 +92,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) {
backend := NewPeeringBackend(srv) backend := NewPeeringBackend(srv)
testutil.RunStep(t, "peer to servers", func(t *testing.T) { testutil.RunStep(t, "peer to servers", func(t *testing.T) {
addrs, err := backend.GetServerAddresses() addrs, err := backend.GetLocalServerAddresses()
require.NoError(t, err) require.NoError(t, err)
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort) expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
@ -107,7 +108,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) {
} }
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh)) require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
addrs, err := backend.GetServerAddresses() addrs, err := backend.GetLocalServerAddresses()
require.NoError(t, err) require.NoError(t, err)
// Still expect server address because PeerThroughMeshGateways was not enabled. // Still expect server address because PeerThroughMeshGateways was not enabled.
@ -121,7 +122,7 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) {
} }
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh)) require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
addrs, err := backend.GetServerAddresses() addrs, err := backend.GetLocalServerAddresses()
require.Nil(t, addrs) require.Nil(t, addrs)
testutil.RequireErrorContains(t, err, testutil.RequireErrorContains(t, err,
"servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered") "servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
@ -147,12 +148,221 @@ func TestPeeringBackend_GetServerAddresses(t *testing.T) {
} }
require.NoError(t, srv.fsm.State().EnsureRegistration(2, &reg)) require.NoError(t, srv.fsm.State().EnsureRegistration(2, &reg))
addrs, err := backend.GetServerAddresses() addrs, err := backend.GetLocalServerAddresses()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []string{"154.238.12.252:8443"}, addrs) require.Equal(t, []string{"154.238.12.252:8443"}, addrs)
}) })
} }
func TestPeeringBackend_GetDialAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, cfg := testServerConfig(t)
cfg.GRPCTLSPort = freeport.GetOne(t)
srv, err := newServer(t, cfg)
require.NoError(t, err)
testrpc.WaitForLeader(t, srv.RPC, "dc1")
backend := NewPeeringBackend(srv)
dialerPeerID := testUUID()
acceptorPeerID := testUUID()
type expectation struct {
addrs []string
haveGateways bool
err string
}
type testCase struct {
name string
setup func(store *state.Store)
peerID string
expect expectation
}
run := func(t *testing.T, tc testCase) {
if tc.setup != nil {
tc.setup(srv.fsm.State())
}
ring, haveGateways, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID)
if tc.expect.err != "" {
testutil.RequireErrorContains(t, err, tc.expect.err)
return
}
require.Equal(t, tc.expect.haveGateways, haveGateways)
require.NotNil(t, ring)
var addrs []string
ring.Do(func(value any) {
addr, ok := value.(string)
require.True(t, ok)
addrs = append(addrs, addr)
})
require.Equal(t, tc.expect.addrs, addrs)
}
// NOTE: The following tests are set up to run serially with RunStep to save on the setup/teardown cost for a test server.
tt := []testCase{
{
name: "unknown peering",
setup: func(store *state.Store) {
// Test peering is not written during setup
},
peerID: acceptorPeerID,
expect: expectation{
err: fmt.Sprintf(`there is no active peering for %q`, acceptorPeerID),
},
},
{
name: "no server addresses",
setup: func(store *state.Store) {
require.NoError(t, store.PeeringWrite(1, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "acceptor",
ID: acceptorPeerID,
// Acceptor peers do not have PeerServerAddresses populated locally.
},
}))
},
peerID: acceptorPeerID,
expect: expectation{
err: "no known addresses",
},
},
{
name: "only server addrs are returned when mesh config does not exist",
setup: func(store *state.Store) {
require.NoError(t, store.PeeringWrite(2, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "dialer",
ID: dialerPeerID,
PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
},
}))
// Mesh config entry does not exist
},
peerID: dialerPeerID,
expect: expectation{
haveGateways: false,
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
},
},
{
name: "only server addrs are returned when not peering through gateways",
setup: func(store *state.Store) {
require.NoError(t, srv.fsm.State().EnsureConfigEntry(3, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: false, // Peering through gateways is not enabled
},
}))
},
peerID: dialerPeerID,
expect: expectation{
haveGateways: false,
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
},
},
{
name: "only server addrs are returned when peering through gateways without gateways registered",
setup: func(store *state.Store) {
require.NoError(t, srv.fsm.State().EnsureConfigEntry(4, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
// No gateways are registered
},
peerID: dialerPeerID,
expect: expectation{
haveGateways: false,
// Fall back to remote server addresses
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
},
},
{
name: "gateway addresses are included after gateways are registered",
setup: func(store *state.Store) {
require.NoError(t, srv.fsm.State().EnsureRegistration(5, &structs.RegisterRequest{
ID: types.NodeID(testUUID()),
Node: "gateway-node-1",
Address: "5.6.7.8",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-1",
Service: "mesh-gateway",
Port: 8443,
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWAN: {
Address: "my-lb-addr.not-aws.com",
Port: 443,
},
},
},
}))
require.NoError(t, srv.fsm.State().EnsureRegistration(6, &structs.RegisterRequest{
ID: types.NodeID(testUUID()),
Node: "gateway-node-2",
Address: "6.7.8.9",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-2",
Service: "mesh-gateway",
Port: 8443,
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWAN: {
Address: "my-other-lb-addr.not-aws.com",
Port: 443,
},
},
},
}))
},
peerID: dialerPeerID,
expect: expectation{
haveGateways: true,
// Gateways come first, and we use their LAN addresses since this is for outbound communication.
addrs: []string{"6.7.8.9:8443", "5.6.7.8:8443", "1.2.3.4:8502", "2.3.4.5:8503"},
},
},
{
name: "addresses are not returned if the peering is deleted",
setup: func(store *state.Store) {
require.NoError(t, store.PeeringWrite(5, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "dialer",
ID: dialerPeerID,
PeerServerAddresses: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
// Mark as deleted
State: pbpeering.PeeringState_DELETING,
DeletedAt: structs.TimeToProto(time.Now()),
},
}))
},
peerID: dialerPeerID,
expect: expectation{
err: fmt.Sprintf(`there is no active peering for %q`, dialerPeerID),
},
},
}
for _, tc := range tt {
testutil.RunStep(t, tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) { func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) { return func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{} d := net.Dialer{}

View File

@ -709,7 +709,7 @@ func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) {
return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config") return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config")
case cfg == nil: case cfg == nil:
logger.Warn("cannot begin stream because Connect CA is not yet initialized") logger.Warn("cannot begin stream because Connect CA is not yet initialized")
return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized") return "", grpcstatus.Error(codes.Unavailable, "Connect CA is not yet initialized")
} }
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
} }

View File

@ -9,6 +9,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
@ -564,7 +565,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
peeringListCtx, cancel := context.WithCancel(ctx) peeringListCtx, cancel := context.WithCancel(ctx)
err := s.dataSources.PeeringList.Notify(peeringListCtx, &cachetype.PeeringListRequest{ err := s.dataSources.PeeringList.Notify(peeringListCtx, &cachetype.PeeringListRequest{
Request: &pbpeering.PeeringListRequest{ Request: &pbpeering.PeeringListRequest{
Partition: structs.WildcardSpecifier, Partition: acl.WildcardPartitionName,
}, },
}, peerServersWatchID, s.ch) }, peerServersWatchID, s.ch)
if err != nil { if err != nil {

View File

@ -247,7 +247,7 @@ func genVerifyPeeringListWatchForMeshGateway() verifyWatchRequest {
return func(t testing.TB, request any) { return func(t testing.TB, request any) {
reqReal, ok := request.(*cachetype.PeeringListRequest) reqReal, ok := request.(*cachetype.PeeringListRequest)
require.True(t, ok) require.True(t, ok)
require.Equal(t, structs.WildcardSpecifier, reqReal.Request.Partition) require.Equal(t, acl.WildcardPartitionName, reqReal.Request.Partition)
} }
} }

View File

@ -1,6 +1,7 @@
package peering package peering
import ( import (
"container/ring"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -8,6 +9,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
@ -36,6 +38,15 @@ var (
errPeeringTokenEmptyPeerID = errors.New("peering token peer ID value is empty") errPeeringTokenEmptyPeerID = errors.New("peering token peer ID value is empty")
) )
const (
// meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway.
// This wait provides some time for the first gateway address to configure a route to the peer servers.
// Why 350ms? That is roughly the p50 latency we observed in a scale test for proxy config propagation:
// https://www.hashicorp.com/cgsb
meshGatewayWait = 350 * time.Millisecond
establishmentTimeout = 5 * time.Second
)
// errPeeringInvalidServerAddress is returned when an establish request contains // errPeeringInvalidServerAddress is returned when an establish request contains
// an invalid server address. // an invalid server address.
type errPeeringInvalidServerAddress struct { type errPeeringInvalidServerAddress struct {
@ -118,9 +129,9 @@ type Backend interface {
// It returns the server name to validate, and the CA certificate to validate with. // It returns the server name to validate, and the CA certificate to validate with.
GetTLSMaterials(generatingToken bool) (string, []string, error) GetTLSMaterials(generatingToken bool) (string, []string, error)
// GetServerAddresses returns the addresses used for establishing a peering connection. // GetLocalServerAddresses returns the addresses used for establishing a peering connection.
// These may be server addresses or mesh gateway addresses if peering through mesh gateways. // These may be server addresses or mesh gateway addresses if peering through mesh gateways.
GetServerAddresses() ([]string, error) GetLocalServerAddresses() ([]string, error)
// EncodeToken packages a peering token into a slice of bytes. // EncodeToken packages a peering token into a slice of bytes.
EncodeToken(tok *structs.PeeringToken) ([]byte, error) EncodeToken(tok *structs.PeeringToken) ([]byte, error)
@ -128,6 +139,12 @@ type Backend interface {
// DecodeToken unpackages a peering token from a slice of bytes. // DecodeToken unpackages a peering token from a slice of bytes.
DecodeToken([]byte) (*structs.PeeringToken, error) DecodeToken([]byte) (*structs.PeeringToken, error)
// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers,
// a boolean indicating whether mesh gateways are present, and an optional error.
// The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local
// datacenter is configured to dial through mesh gateways.
GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error)
EnterpriseCheckPartitions(partition string) error EnterpriseCheckPartitions(partition string) error
EnterpriseCheckNamespaces(namespace string) error EnterpriseCheckNamespaces(namespace string) error
@ -298,7 +315,7 @@ func (s *Server) GenerateToken(
if len(req.ServerExternalAddresses) > 0 { if len(req.ServerExternalAddresses) > 0 {
serverAddrs = req.ServerExternalAddresses serverAddrs = req.ServerExternalAddresses
} else { } else {
serverAddrs, err = s.Backend.GetServerAddresses() serverAddrs, err = s.Backend.GetLocalServerAddresses()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -419,7 +436,13 @@ func (s *Server) Establish(
PeerServerName: tok.ServerName, PeerServerName: tok.ServerName,
PeerID: tok.PeerID, PeerID: tok.PeerID,
Meta: req.Meta, Meta: req.Meta,
State: pbpeering.PeeringState_ESTABLISHING,
// State is intentionally not set until after the secret exchange succeeds.
// This is to prevent a scenario where an active peering is re-established,
// the secret exchange fails, and the peering state gets stuck in "Establishing"
// while the original connection is still active.
// State: pbpeering.PeeringState_ESTABLISHING,
// PartitionOrEmpty is used to avoid writing "default" in OSS. // PartitionOrEmpty is used to avoid writing "default" in OSS.
Partition: entMeta.PartitionOrEmpty(), Partition: entMeta.PartitionOrEmpty(),
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
@ -428,39 +451,30 @@ func (s *Server) Establish(
}, },
} }
tlsOption, err := peering.TLSDialOption() // Write the peering ahead of the ExchangeSecret handshake to give
if err != nil { // mesh gateways in the default partition an opportunity
return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) // to update their config with an outbound route to this peer server.
//
// If the request to exchange a secret fails then the peering will continue to exist.
// We do not undo this write because this call to establish may actually be a re-establish call
// for an active peering.
writeReq := &pbpeering.PeeringWriteRequest{
Peering: peering,
}
if err := s.Backend.PeeringWrite(writeReq); err != nil {
return nil, fmt.Errorf("failed to write peering: %w", err)
} }
exchangeReq := pbpeerstream.ExchangeSecretRequest{ exchangeResp, dialErrors := s.exchangeSecret(ctx, peering, tok.EstablishmentSecret)
PeerID: peering.PeerID,
EstablishmentSecret: tok.EstablishmentSecret,
}
var exchangeResp *pbpeerstream.ExchangeSecretResponse
// Loop through the known server addresses once, attempting to fetch the long-lived stream secret.
var dialErrors error
for _, addr := range serverAddrs {
exchangeResp, err = exchangeSecret(ctx, addr, tlsOption, &exchangeReq)
if err != nil {
dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret with %q: %w", addr, err))
}
if exchangeResp != nil {
break
}
}
if exchangeResp == nil { if exchangeResp == nil {
return nil, dialErrors return nil, dialErrors
} }
peering.State = pbpeering.PeeringState_ESTABLISHING
// As soon as a peering is written with a non-empty list of ServerAddresses // As soon as a peering is written with a non-empty list of ServerAddresses
// and an active stream secret, a leader routine will see the peering and // and an active stream secret, a leader routine will see the peering and
// attempt to establish a peering stream with the remote peer. // attempt to establish a peering stream with the remote peer.
// writeReq = &pbpeering.PeeringWriteRequest{
// This peer now has a record of both the LocalPeerID(ID) and
// RemotePeerID(PeerID) but at this point the other peer does not.
writeReq := &pbpeering.PeeringWriteRequest{
Peering: peering, Peering: peering,
SecretsRequest: &pbpeering.SecretsWriteRequest{ SecretsRequest: &pbpeering.SecretsWriteRequest{
PeerID: peering.ID, PeerID: peering.ID,
@ -474,7 +488,6 @@ func (s *Server) Establish(
if err := s.Backend.PeeringWrite(writeReq); err != nil { if err := s.Backend.PeeringWrite(writeReq); err != nil {
return nil, fmt.Errorf("failed to write peering: %w", err) return nil, fmt.Errorf("failed to write peering: %w", err)
} }
// TODO(peering): low prio: consider adding response details
return resp, nil return resp, nil
} }
@ -493,20 +506,78 @@ func (s *Server) validatePeeringLocality(token *structs.PeeringToken) error {
return nil return nil
} }
func exchangeSecret(ctx context.Context, addr string, tlsOption grpc.DialOption, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error) { // exchangeSecret will continuously attempt to exchange the given establishment secret with the peer, up to a timeout.
dialCtx, cancel := context.WithTimeout(ctx, 10*time.Second) // This function will attempt to dial through mesh gateways if the local DC is configured to peer through gateways,
// but will fall back to server addresses if not.
func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering, establishmentSecret string) (*pbpeerstream.ExchangeSecretResponse, error) {
req := pbpeerstream.ExchangeSecretRequest{
PeerID: peering.PeerID,
EstablishmentSecret: establishmentSecret,
}
tlsOption, err := peering.TLSDialOption()
if err != nil {
return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err)
}
ringBuf, usingGateways, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID)
if err != nil {
return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err)
}
var (
resp *pbpeerstream.ExchangeSecretResponse
dialErrors error
)
retryWait := 150 * time.Millisecond
jitter := retry.NewJitter(25)
if usingGateways {
// If we are dialing through local gateways we sleep before issuing the first request.
// This gives the local gateways some time to configure a route to the peer servers.
time.Sleep(meshGatewayWait)
}
retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout)
defer cancel() defer cancel()
conn, err := grpc.DialContext(dialCtx, addr, for retryCtx.Err() == nil {
tlsOption, addr := ringBuf.Value.(string)
)
if err != nil {
return nil, fmt.Errorf("failed to dial peer: %w", err)
}
defer conn.Close()
client := pbpeerstream.NewPeerStreamServiceClient(conn) dialCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
return client.ExchangeSecret(ctx, req) defer cancel()
conn, err := grpc.DialContext(dialCtx, addr,
tlsOption,
)
if err != nil {
return nil, fmt.Errorf("failed to dial peer: %w", err)
}
defer conn.Close()
client := pbpeerstream.NewPeerStreamServiceClient(conn)
resp, err = client.ExchangeSecret(ctx, &req)
// If we got a permission denied error that means out establishment secret is invalid, so we do not retry.
grpcErr, ok := grpcstatus.FromError(err)
if ok && grpcErr.Code() == codes.PermissionDenied {
return nil, fmt.Errorf("a new peering token must be generated: %w", grpcErr.Err())
}
if err != nil {
dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret through address %q: %w", addr, err))
}
if resp != nil {
// Got a valid response. We're done.
break
}
time.Sleep(jitter(retryWait))
// Cycle to the next possible address.
ringBuf = ringBuf.Next()
}
return resp, dialErrors
} }
// OPTIMIZE: Handle blocking queries // OPTIMIZE: Handle blocking queries

View File

@ -12,6 +12,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/tcpproxy"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -476,6 +477,188 @@ func TestPeeringService_Establish(t *testing.T) {
}) })
} }
func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
// This test is timing-sensitive, must not be run in parallel.
// t.Parallel()
acceptor := newTestServer(t, func(conf *consul.Config) {
conf.NodeName = "acceptor"
})
acceptorClient := pbpeering.NewPeeringServiceClient(acceptor.ClientConn(t))
dialer := newTestServer(t, func(conf *consul.Config) {
conf.NodeName = "dialer"
conf.Datacenter = "dc2"
conf.PrimaryDatacenter = "dc2"
})
dialerClient := pbpeering.NewPeeringServiceClient(dialer.ClientConn(t))
var peeringToken string
testutil.RunStep(t, "retry until timeout on dial errors", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
testToken := structs.PeeringToken{
ServerAddresses: []string{fmt.Sprintf("127.0.0.1:%d", freeport.GetOne(t))},
PeerID: testUUID(t),
}
testTokenJSON, _ := json.Marshal(&testToken)
testTokenB64 := base64.StdEncoding.EncodeToString(testTokenJSON)
start := time.Now()
_, err := dialerClient.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: testTokenB64,
})
require.Error(t, err)
testutil.RequireErrorContains(t, err, "connection refused")
require.Greater(t, time.Since(start), 5*time.Second)
})
testutil.RunStep(t, "peering can be established from token", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
// Generate a peering token for dialer
tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"})
require.NoError(t, err)
// Capture peering token for re-use later
peeringToken = tokenResp.PeeringToken
// The context timeout is short, it checks that we do not wait the 350ms that we do when peering through mesh gateways
ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond)
t.Cleanup(cancel)
_, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: tokenResp.PeeringToken,
})
require.NoError(t, err)
})
testutil.RunStep(t, "fail fast on permission denied", func(t *testing.T) {
// This test case re-uses the previous token since the establishment secret will have been invalidated.
// The context timeout is short, it checks that we do not retry.
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
t.Cleanup(cancel)
_, err := dialerClient.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: peeringToken,
})
testutil.RequireErrorContains(t, err, "a new peering token must be generated")
})
gatewayPort := freeport.GetOne(t)
testutil.RunStep(t, "fail past bad mesh gateway", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)
// Generate a new peering token for the dialer.
tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"})
require.NoError(t, err)
store := dialer.Server.FSM().State()
require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
// Register a gateway that isn't actually listening.
require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{
ID: types.NodeID(testUUID(t)),
Node: "gateway-node-1",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-1",
Service: "mesh-gateway",
Port: gatewayPort,
},
}))
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
// Call to establish should succeed when we fall back to remote server address.
_, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: tokenResp.PeeringToken,
})
require.NoError(t, err)
})
testutil.RunStep(t, "route through gateway", func(t *testing.T) {
// Spin up a proxy listening at the gateway port registered above.
gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort)
// Configure a TCP proxy with an SNI route corresponding to the acceptor cluster.
var proxy tcpproxy.Proxy
target := &connWrapper{
proxy: tcpproxy.DialProxy{
Addr: acceptor.PublicGRPCAddr,
},
}
proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target)
proxy.AddStopACMESearch(gatewayAddr)
require.NoError(t, proxy.Start())
t.Cleanup(func() {
proxy.Close()
proxy.Wait()
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)
// Generate a new peering token for the dialer.
tokenResp, err := acceptorClient.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{PeerName: "my-peer-dialer"})
require.NoError(t, err)
store := dialer.Server.FSM().State()
require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)
start := time.Now()
// Call to establish should succeed through the proxy.
_, err = dialerClient.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: tokenResp.PeeringToken,
})
require.NoError(t, err)
// Dialing through a gateway is preceded by a mandatory 350ms sleep.
require.Greater(t, time.Since(start), 350*time.Millisecond)
// target.called is true when the tcproxy's conn handler was invoked.
// This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway.
require.True(t, target.called)
})
}
// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection.
type connWrapper struct {
proxy tcpproxy.DialProxy
called bool
}
func (w *connWrapper) HandleConn(src net.Conn) {
w.called = true
w.proxy.HandleConn(src)
}
func TestPeeringService_Establish_ACLEnforcement(t *testing.T) { func TestPeeringService_Establish_ACLEnforcement(t *testing.T) {
validToken := peering.TestPeeringToken("83474a06-cca4-4ff4-99a4-4152929c8160") validToken := peering.TestPeeringToken("83474a06-cca4-4ff4-99a4-4152929c8160")
validTokenJSON, _ := json.Marshal(&validToken) validTokenJSON, _ := json.Marshal(&validToken)

View File

@ -0,0 +1,5 @@
primary_datacenter = "alpha"
log_level = "trace"
peering {
enabled = true
}

View File

@ -0,0 +1,32 @@
config_entries {
bootstrap = [
{
kind = "proxy-defaults"
name = "global"
config {
protocol = "tcp"
}
},
{
kind = "mesh"
peering {
peer_through_mesh_gateways = true
}
},
{
kind = "exported-services"
name = "default"
services = [
{
name = "s2"
consumers = [
{
peer = "alpha-to-primary"
}
]
}
]
}
]
}

View File

@ -0,0 +1,5 @@
services {
name = "mesh-gateway"
kind = "mesh-gateway"
port = 4432
}

View File

@ -0,0 +1 @@
# We don't want an s1 service in this peer

View File

@ -0,0 +1,7 @@
services {
name = "s2"
port = 8181
connect {
sidecar_service {}
}
}

View File

@ -0,0 +1,12 @@
#!/bin/bash
set -euo pipefail
register_services alpha
gen_envoy_bootstrap s2 19002 alpha
gen_envoy_bootstrap mesh-gateway 19003 alpha true
wait_for_config_entry proxy-defaults global alpha
wait_for_config_entry exported-services default alpha
wait_for_config_entry mesh mesh alpha

View File

@ -0,0 +1,49 @@
#!/usr/bin/env bats
load helpers
@test "s2 proxy is running correct version" {
assert_envoy_version 19002
}
@test "s2 proxy admin is up on :19002" {
retry_default curl -f -s localhost:19002/stats -o /dev/null
}
@test "gateway-alpha proxy admin is up on :19003" {
retry_default curl -f -s localhost:19003/stats -o /dev/null
}
@test "s2 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s2 alpha
}
@test "s2 proxy should be healthy" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "gateway-alpha should be up and listening" {
retry_long nc -z consul-alpha-client:4432
}
@test "s2 proxies should be healthy" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "dialer gateway-alpha should have healthy endpoints for alpha servers" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19003 server.alpha.peering HEALTHY 1
}
@test "dialer gateway-alpha should have healthy endpoints for primary servers" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19003 server.primary.peering HEALTHY 1
}
# Re-peering the clusters is a way to have alpha dial out through its own gateway
# since we know it is configured with endpoints for primary from the first time they peered.
@test "re-peer the two clusters together" {
create_peering primary alpha
}
@test "alpha servers made connection to primary servers via alpha gateway" {
assert_envoy_metric_at_least 127.0.0.1:19003 "cluster.server.primary.peering.*cx_total" 1
}

View File

@ -0,0 +1,2 @@
bind_addr = "0.0.0.0"
advertise_addr = "{{ GetInterfaceIP \"eth0\" }}"

View File

@ -0,0 +1,4 @@
#!/bin/bash
snapshot_envoy_admin localhost:19001 mesh-gateway primary || true
snapshot_envoy_admin localhost:19003 mesh-gateway alpha || true

View File

@ -0,0 +1,3 @@
peering {
enabled = true
}

View File

@ -0,0 +1,18 @@
config_entries {
bootstrap = [
{
kind = "proxy-defaults"
name = "global"
config {
protocol = "tcp"
}
},
{
kind = "mesh"
peering {
peer_through_mesh_gateways = true
}
}
]
}

View File

@ -0,0 +1,5 @@
services {
name = "mesh-gateway"
kind = "mesh-gateway"
port = 4431
}

View File

@ -0,0 +1,20 @@
services {
name = "s1"
port = 8080
connect {
sidecar_service {
proxy {
upstreams = [
{
destination_name = "s2"
destination_peer = "primary-to-alpha"
local_bind_port = 5000
mesh_gateway {
mode = "local"
}
}
]
}
}
}
}

View File

@ -0,0 +1 @@
# We don't want an s2 service in the primary dc

View File

@ -0,0 +1,11 @@
#!/bin/bash
set -euo pipefail
register_services primary
gen_envoy_bootstrap s1 19000 primary
gen_envoy_bootstrap mesh-gateway 19001 primary true
wait_for_config_entry proxy-defaults global
wait_for_config_entry mesh mesh alpha

View File

@ -0,0 +1,69 @@
#!/usr/bin/env bats
load helpers
@test "s1 proxy is running correct version" {
assert_envoy_version 19000
}
@test "s1 proxy admin is up on :19000" {
retry_default curl -f -s localhost:19000/stats -o /dev/null
}
@test "gateway-primary proxy admin is up on :19001" {
retry_default curl -f -s localhost:19001/stats -o /dev/null
}
@test "s1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s1
}
@test "s2 proxies should be healthy in alpha" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "gateway-primary should be up and listening" {
retry_long nc -z consul-primary-client:4431
}
@test "gateway-alpha should be up and listening" {
retry_long nc -z consul-alpha-client:4432
}
@test "peer the two clusters together" {
create_peering primary alpha
}
@test "acceptor gateway-primary should have healthy endpoints for primary servers" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19001 server.primary.peering HEALTHY 1
}
@test "alpha servers made connection to primary servers via primary gateway" {
assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.server.primary.peering.*cx_total" 1
}
@test "s2 alpha proxies should be healthy in primary" {
assert_service_has_healthy_instances s2 1 primary "" "" primary-to-alpha
}
@test "gateway-alpha should have healthy endpoints for s2" {
assert_upstream_has_endpoints_in_status consul-alpha-client:19003 exported~s2.default.alpha HEALTHY 1
}
@test "s1 upstream should have healthy endpoints for s2" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2.default.primary-to-alpha.external HEALTHY 1
}
@test "s1 upstream should be able to connect to s2" {
run retry_default curl -s -f -d hello localhost:5000
[ "$status" -eq 0 ]
[ "$output" = "hello" ]
}
@test "s1 upstream made 1 connection to s2" {
assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.s2.default.primary-to-alpha.external.*cx_total" 1
}
@test "s1 upstream made 1 connection to s2 through the primary mesh gateway" {
assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.s2.default.default.alpha-to-primary.external.*cx_total" 1
}

View File

@ -0,0 +1,4 @@
#!/bin/bash
export REQUIRED_SERVICES="s1 s1-sidecar-proxy gateway-primary s2-alpha s2-sidecar-proxy-alpha gateway-alpha"
export REQUIRE_PEERS=1