Update leader routine to maybe use gateways

This commit is contained in:
freddygv 2022-10-10 13:54:36 -06:00
parent e69bc727ec
commit 2c99a21596
4 changed files with 318 additions and 75 deletions

View File

@ -243,11 +243,22 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
continue continue
} }
status, found := s.peerStreamServer.StreamStatus(peer.ID) // We may have written this peering to the store to trigger xDS updates, but still in the process of establishing.
// If there isn't a secret yet, we're still trying to reach the other server.
logger.Trace("reading peering secret", "sequence_id", seq)
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
continue
}
// TODO(peering): If there is new peering data and a connected stream, should we tear down the stream? status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found && status.Connected { if found && status.Connected {
// Nothing to do when we already have an active stream to the peer. // Nothing to do when we already have an active stream to the peer.
// Updated data will only be used if the stream becomes disconnected
// since there's no need to tear down an active stream.
continue continue
} }
logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq) logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq)
@ -259,7 +270,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
cancel() cancel()
} }
if err := s.establishStream(ctx, logger, ws, peer, cancelFns); err != nil { if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil {
// TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs. // TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs.
// Lockable status isn't available here though. Could report it via the peering.Service? // Lockable status isn't available here though. Could report it via the peering.Service?
logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err) logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err)
@ -273,7 +284,6 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq) logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq)
// Clean up active streams of peerings that were deleted from the state store. // Clean up active streams of peerings that were deleted from the state store.
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
for stream, doneCh := range connectedStreams { for stream, doneCh := range connectedStreams {
if _, ok := stored[stream]; ok { if _, ok := stored[stream]; ok {
// Active stream is in the state store, nothing to do. // Active stream is in the state store, nothing to do.
@ -298,7 +308,11 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
return merr.ErrorOrNil() return merr.ErrorOrNil()
} }
func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws memdb.WatchSet, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error { func (s *Server) establishStream(ctx context.Context,
logger hclog.Logger,
peer *pbpeering.Peering,
secret *pbpeering.PeeringSecrets,
cancelFns map[string]context.CancelFunc) error {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID) logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)
if peer.PeerID == "" { if peer.PeerID == "" {
@ -310,10 +324,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
return fmt.Errorf("failed to build TLS dial option from peering: %w", err) return fmt.Errorf("failed to build TLS dial option from peering: %w", err)
} }
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" { if secret.GetStream().GetActiveSecretID() == "" {
return errors.New("missing stream secret for peering stream authorization, peering must be re-established") return errors.New("missing stream secret for peering stream authorization, peering must be re-established")
} }
@ -331,15 +341,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
// Start a goroutine to watch for updates to peer server addresses. // Start a goroutine to watch for updates to peer server addresses.
// The latest valid server address can be received from nextServerAddr. // The latest valid server address can be received from nextServerAddr.
nextServerAddr := make(chan string) nextServerAddr := make(chan string)
go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr) go s.watchAddresses(streamCtx, peer.ID, nextServerAddr)
// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoffPeering(streamCtx, logger, func() error { go retryLoopBackoffPeering(streamCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors. // Try a new address on each iteration by advancing the ring buffer on errors.
addr := <-nextServerAddr addr, stillOpen := <-nextServerAddr
if !stillOpen {
// If the channel was closed that means the context was canceled, so we return.
return streamCtx.Err()
}
opts := []grpc.DialOption{ opts := []grpc.DialOption{
tlsOption, tlsOption,
// TODO(peering): Use a grpc.WithStatsHandler here.
// This should wait until the grpc-external server is wired up with a stats handler in NET-50.
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that. // For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, Time: 30 * time.Second,
@ -349,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}), }),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)),
} }
// TODO(peering): use a grpc.WithStatsHandler here?)
logger.Trace("dialing peer", "addr", addr) logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr, opts...) conn, err := grpc.DialContext(streamCtx, addr, opts...)
@ -400,83 +416,75 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}, func(err error) { }, func(err error) {
// TODO(peering): why are we using TrackSendError here? This could also be a receive error. // TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error()) streamStatus.TrackSendError(err.Error())
if isErrCode(err, codes.FailedPrecondition) {
switch {
case isErrCode(err, codes.FailedPrecondition):
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err) "error", err)
return
} else if isErrCode(err, codes.ResourceExhausted) { case isErrCode(err, codes.ResourceExhausted):
logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting", logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting",
"error", err) "error", err)
return
} case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
logger.Debug("stream context was canceled", "error", err)
case err != nil:
logger.Error("error managing peering stream", "error", err) logger.Error("error managing peering stream", "error", err)
}
}, peeringRetryTimeout) }, peeringRetryTimeout)
return nil return nil
} }
// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr. // watchAddresses sends an up-to-date address to nextServerAddr.
// It loads the server addresses into a ring buffer and cycles through them until: // These could be either remote peer server addresses, or local mesh gateways.
// 1. streamCtx is cancelled (peer is deleted) // The function loads the addresses into a ring buffer and cycles through them until:
// 2. the peer is modified and the watchset fires. // 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data)
// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires.
// //
// In case (2) we refetch the peering and rebuild the ring buffer. // In case (2) we re-fetch all the data sources and rebuild the ring buffer.
func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) { // In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with
// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses.
func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) {
defer close(nextServerAddr) defer close(nextServerAddr)
// we initialize the ring buffer with the peer passed to `establishStream` var ringbuf *ring.Ring
// because the caller has pre-checked `peer.ShouldDial`, guaranteeing var ws memdb.WatchSet
// at least one server address.
//
// IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block.
ringbuf := ring.New(len(peer.PeerServerAddresses))
for _, addr := range peer.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
innerWs := memdb.NewWatchSet()
_, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.",
"peer_id", peer.ID,
"error", err)
}
fetchAddrs := func() error { fetchAddresses := func() error {
// reinstantiate innerWs to prevent it from growing indefinitely // Re-instantiate ws since it can only be watched once.
innerWs = memdb.NewWatchSet() ws = memdb.NewWatchSet()
_, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err)
}
if !peering.IsActive() {
return fmt.Errorf("peer %q is no longer active", peer.ID)
}
if len(peering.PeerServerAddresses) == 0 {
return fmt.Errorf("peer %q has no addresses to dial", peer.ID)
}
ringbuf = ring.New(len(peering.PeerServerAddresses)) newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID)
for _, addr := range peering.PeerServerAddresses { if err != nil {
ringbuf.Value = addr return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err)
ringbuf = ringbuf.Next()
} }
ringbuf = newRing
return nil return nil
} }
// Initialize the first ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err)
}
for { for {
select { select {
case nextServerAddr <- ringbuf.Value.(string): case nextServerAddr <- ringbuf.Value.(string):
ringbuf = ringbuf.Next() ringbuf = ringbuf.Next()
case err := <-innerWs.WatchCh(ctx):
case err := <-ws.WatchCh(ctx):
if err != nil { if err != nil {
// context was cancelled // Context was cancelled.
return return
} }
// watch fired so we refetch the peering and rebuild the ring buffer
if err := fetchAddrs(); err != nil { // Watch fired so we re-fetch the necessary addresses and replace the ring buffer.
s.logger.Warn("watchset for peer was fired but failed to update server addresses", if err := fetchAddresses(); err != nil {
"peer_id", peer.ID, s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated",
"peer_id", peerID,
"error", err) "error", err)
} }
} }

View File

@ -8,10 +8,12 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math" "math"
"net"
"testing" "testing"
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/google/tcpproxy"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -1861,3 +1863,239 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {
}) })
}) })
} }
func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
ca := connect.TestCA(t, nil)
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCTLSPort = freeport.GetOne(t)
c.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
},
}
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
acceptorClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Configure peering to go through mesh gateways
store := dialer.fsm.State()
require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
// Register a gateway that isn't actually listening.
require.NoError(t, store.EnsureRegistration(2, &structs.RegisterRequest{
ID: types.NodeID(testUUID()),
Node: "gateway-node-1",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-1",
Service: "mesh-gateway",
Port: freeport.GetOne(t),
},
}))
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
// The peering should eventually connect because we fall back to the token's server addresses.
retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
}
func Test_Leader_PeeringSync_PeerThroughMeshGateways_Success(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
ca := connect.TestCA(t, nil)
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCTLSPort = freeport.GetOne(t)
c.CAConfig = &structs.CAConfiguration{
ClusterID: connect.TestClusterID,
Provider: structs.ConsulCAProvider,
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
},
}
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
acceptorClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Configure peering to go through mesh gateways
store := dialer.fsm.State()
require.NoError(t, store.EnsureConfigEntry(1, &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}))
// Register a mesh gateway and a tcpproxy listening at its address.
gatewayPort := freeport.GetOne(t)
gatewayAddr := fmt.Sprintf("127.0.0.1:%d", gatewayPort)
require.NoError(t, store.EnsureRegistration(3, &structs.RegisterRequest{
ID: types.NodeID(testUUID()),
Node: "gateway-node-2",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway-2",
Service: "mesh-gateway",
Port: gatewayPort,
},
}))
// Configure a TCP proxy with an SNI route corresponding to the acceptor cluster.
var proxy tcpproxy.Proxy
target := &connWrapper{
proxy: tcpproxy.DialProxy{
Addr: fmt.Sprintf("127.0.0.1:%d", acceptor.config.GRPCTLSPort),
},
}
proxy.AddSNIRoute(gatewayAddr, "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul", target)
proxy.AddStopACMESearch(gatewayAddr)
require.NoError(t, proxy.Start())
t.Cleanup(func() {
proxy.Close()
proxy.Wait()
})
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
// The peering should eventually connect through the gateway address.
retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
// target.called is true when the tcproxy's conn handler was invoked.
// This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway.
require.True(t, target.called)
}
// connWrapper is a wrapper around tcpproxy.DialProxy to enable tracking whether the proxy handled a connection.
type connWrapper struct {
proxy tcpproxy.DialProxy
called bool
}
func (w *connWrapper) HandleConn(src net.Conn) {
w.called = true
w.proxy.HandleConn(src)
}

View File

@ -139,12 +139,7 @@ func (b *PeeringBackend) fetchPeerServerAddresses(ws memdb.WatchSet, peerID stri
if !peering.IsActive() { if !peering.IsActive() {
return nil, fmt.Errorf("there is no active peering for %q", peerID) return nil, fmt.Errorf("there is no active peering for %q", peerID)
} }
return bufferFromAddresses(peering.PeerServerAddresses)
// IMPORTANT: The address ring buffer must always be length > 0
if len(peering.PeerServerAddresses) == 0 {
return nil, fmt.Errorf("peer %q has no addresses to dial", peerID)
}
return bufferFromAddresses(peering.PeerServerAddresses), nil
} }
// maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the // maybeFetchGatewayAddresses will return a ring buffer with the latest gateway addresses if the
@ -157,12 +152,10 @@ func (b *PeeringBackend) maybeFetchGatewayAddresses(ws memdb.WatchSet) (*ring.Ri
} }
if useGateways { if useGateways {
addresses, err := meshGatewayAdresses(b.srv.fsm.State(), ws, false) addresses, err := meshGatewayAdresses(b.srv.fsm.State(), ws, false)
if err != nil {
// IMPORTANT: The address ring buffer must always be length > 0
if err != nil || len(addresses) == 0 {
return nil, fmt.Errorf("error fetching local mesh gateway addresses: %w", err) return nil, fmt.Errorf("error fetching local mesh gateway addresses: %w", err)
} }
return bufferFromAddresses(addresses), nil return bufferFromAddresses(addresses)
} }
return nil, nil return nil, nil
} }
@ -182,13 +175,17 @@ func (b *PeeringBackend) PeerThroughMeshGateways(ws memdb.WatchSet) (bool, error
} }
func bufferFromAddresses(addresses []string) *ring.Ring { func bufferFromAddresses(addresses []string) (*ring.Ring, error) {
// IMPORTANT: The address ring buffer must always be length > 0
if len(addresses) == 0 {
return nil, fmt.Errorf("no known addresses")
}
ring := ring.New(len(addresses)) ring := ring.New(len(addresses))
for _, addr := range addresses { for _, addr := range addresses {
ring.Value = addr ring.Value = addr
ring = ring.Next() ring = ring.Next()
} }
return ring return ring, nil
} }
func meshGatewayAdresses(state *state.Store, ws memdb.WatchSet, wan bool) ([]string, error) { func meshGatewayAdresses(state *state.Store, ws memdb.WatchSet, wan bool) ([]string, error) {

View File

@ -232,7 +232,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
}, },
peerID: acceptorPeerID, peerID: acceptorPeerID,
expect: expectation{ expect: expectation{
err: fmt.Sprintf(`peer %q has no addresses to dial`, acceptorPeerID), err: "no known addresses",
}, },
}, },
{ {