mirror of https://github.com/status-im/consul.git
[OSS] fix: wait and try longer to peer through mesh gw (#15328)
This commit is contained in:
parent
bf0f61a878
commit
626249fbf5
|
@ -1956,7 +1956,8 @@ func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T
|
|||
}))
|
||||
|
||||
// Create a peering at dialer by establishing a peering with acceptor's token
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
|
||||
// 7 second = 1 second wait + 3 second gw retry + 3 second token addr retry
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 7*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
|
||||
|
|
|
@ -106,27 +106,38 @@ func (b *PeeringBackend) GetLocalServerAddresses() ([]string, error) {
|
|||
}
|
||||
|
||||
// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers,
|
||||
// a boolean indicating whether mesh gateways are present, and an optional error.
|
||||
// an optional buffer of just gateway addresses, and an optional error.
|
||||
// The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present.
|
||||
func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) {
|
||||
func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, *ring.Ring, error) {
|
||||
newRing, err := b.fetchPeerServerAddresses(ws, peerID)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err)
|
||||
}
|
||||
|
||||
gatewayRing, err := b.maybeFetchGatewayAddresses(ws)
|
||||
if err != nil {
|
||||
// If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses.
|
||||
logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly: %w", "error", err)
|
||||
return newRing, false, nil
|
||||
logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly", "error", err)
|
||||
return newRing, nil, nil
|
||||
}
|
||||
if gatewayRing != nil {
|
||||
// The ordering is important here. We always want to start with the mesh gateway
|
||||
// addresses and fallback to the remote addresses, so we append the server addresses
|
||||
// in newRing to gatewayRing.
|
||||
newRing = gatewayRing.Link(newRing)
|
||||
// in newRing to gatewayRing. We also need a new ring to prevent mixing up pointers
|
||||
// with the gateway-only buffer
|
||||
compositeRing := ring.New(gatewayRing.Len() + newRing.Len())
|
||||
gatewayRing.Do(func(s any) {
|
||||
compositeRing.Value = s.(string)
|
||||
compositeRing = compositeRing.Next()
|
||||
})
|
||||
|
||||
newRing.Do(func(s any) {
|
||||
compositeRing.Value = s.(string)
|
||||
compositeRing = compositeRing.Next()
|
||||
})
|
||||
newRing = compositeRing
|
||||
}
|
||||
return newRing, gatewayRing != nil, nil
|
||||
return newRing, gatewayRing, nil
|
||||
}
|
||||
|
||||
// fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses.
|
||||
|
|
|
@ -198,7 +198,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
|
||||
type expectation struct {
|
||||
addrs []string
|
||||
haveGateways bool
|
||||
gatewayAddrs []string
|
||||
err string
|
||||
}
|
||||
|
||||
|
@ -214,14 +214,25 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
tc.setup(srv.fsm.State())
|
||||
}
|
||||
|
||||
ring, haveGateways, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID)
|
||||
ring, gatewayRing, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID)
|
||||
if tc.expect.err != "" {
|
||||
testutil.RequireErrorContains(t, err, tc.expect.err)
|
||||
return
|
||||
}
|
||||
require.Equal(t, tc.expect.haveGateways, haveGateways)
|
||||
require.Equal(t, len(tc.expect.gatewayAddrs) > 0, gatewayRing != nil)
|
||||
require.NotNil(t, ring)
|
||||
|
||||
if len(tc.expect.gatewayAddrs) > 0 {
|
||||
var addrs []string
|
||||
gatewayRing.Do(func(value any) {
|
||||
addr, ok := value.(string)
|
||||
|
||||
require.True(t, ok)
|
||||
addrs = append(addrs, addr)
|
||||
})
|
||||
require.Equal(t, tc.expect.gatewayAddrs, addrs)
|
||||
}
|
||||
|
||||
var addrs []string
|
||||
ring.Do(func(value any) {
|
||||
addr, ok := value.(string)
|
||||
|
@ -275,8 +286,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
},
|
||||
peerID: dialerPeerID,
|
||||
expect: expectation{
|
||||
haveGateways: false,
|
||||
addrs: []string{"5.6.7.8:8502"},
|
||||
addrs: []string{"5.6.7.8:8502"},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -294,8 +304,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
},
|
||||
peerID: dialerPeerID,
|
||||
expect: expectation{
|
||||
haveGateways: false,
|
||||
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -309,8 +318,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
},
|
||||
peerID: dialerPeerID,
|
||||
expect: expectation{
|
||||
haveGateways: false,
|
||||
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -326,8 +334,6 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
},
|
||||
peerID: dialerPeerID,
|
||||
expect: expectation{
|
||||
haveGateways: false,
|
||||
|
||||
// Fall back to remote server addresses
|
||||
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
},
|
||||
|
@ -372,10 +378,9 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
|
|||
},
|
||||
peerID: dialerPeerID,
|
||||
expect: expectation{
|
||||
haveGateways: true,
|
||||
|
||||
// Gateways come first, and we use their LAN addresses since this is for outbound communication.
|
||||
addrs: []string{"6.7.8.9:8443", "5.6.7.8:8443", "1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
addrs: []string{"5.6.7.8:8443", "6.7.8.9:8443", "1.2.3.4:8502", "2.3.4.5:8503"},
|
||||
gatewayAddrs: []string{"5.6.7.8:8443", "6.7.8.9:8443"},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
@ -18,6 +17,8 @@ import (
|
|||
grpcstatus "google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/acl/resolver"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
|
@ -41,10 +42,10 @@ var (
|
|||
const (
|
||||
// meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway.
|
||||
// This wait provides some time for the first gateway address to configure a route to the peer servers.
|
||||
// Why 350ms? That is roughly the p50 latency we observed in a scale test for proxy config propagation:
|
||||
// https://www.hashicorp.com/cgsb
|
||||
meshGatewayWait = 350 * time.Millisecond
|
||||
establishmentTimeout = 5 * time.Second
|
||||
// This study shows latency distribution https://www.hashicorp.com/cgsb.
|
||||
// With 1s we cover ~p96, then we initiate the 3-second retry loop.
|
||||
meshGatewayWait = 1 * time.Second
|
||||
establishmentTimeout = 3 * time.Second
|
||||
)
|
||||
|
||||
// errPeeringInvalidServerAddress is returned when an establish request contains
|
||||
|
@ -140,10 +141,10 @@ type Backend interface {
|
|||
DecodeToken([]byte) (*structs.PeeringToken, error)
|
||||
|
||||
// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers,
|
||||
// a boolean indicating whether mesh gateways are present, and an optional error.
|
||||
// an optional buffer of just gateway addresses, and an optional error.
|
||||
// The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local
|
||||
// datacenter is configured to dial through mesh gateways.
|
||||
GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error)
|
||||
GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, *ring.Ring, error)
|
||||
|
||||
EnterpriseCheckPartitions(partition string) error
|
||||
|
||||
|
@ -516,11 +517,28 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering,
|
|||
return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err)
|
||||
}
|
||||
|
||||
ringBuf, usingGateways, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID)
|
||||
allAddrs, gatewayAddrs, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err)
|
||||
}
|
||||
|
||||
if gatewayAddrs != nil {
|
||||
// If we are dialing through local gateways we sleep before issuing the first request.
|
||||
// This gives the local gateways some time to configure a route to the peer servers.
|
||||
time.Sleep(meshGatewayWait)
|
||||
|
||||
// Exclusively try
|
||||
resp, _ := retryExchange(ctx, &req, gatewayAddrs, tlsOption)
|
||||
if resp != nil {
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
|
||||
return retryExchange(ctx, &req, allAddrs, tlsOption)
|
||||
}
|
||||
|
||||
// retryExchange attempts a secret exchange in a retry loop, taking a new address from the ring buffer on each iteration
|
||||
func retryExchange(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest, ringBuf *ring.Ring, tlsOption grpc.DialOption) (*pbpeerstream.ExchangeSecretResponse, error) {
|
||||
var (
|
||||
resp *pbpeerstream.ExchangeSecretResponse
|
||||
dialErrors error
|
||||
|
@ -529,12 +547,6 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering,
|
|||
retryWait := 150 * time.Millisecond
|
||||
jitter := retry.NewJitter(25)
|
||||
|
||||
if usingGateways {
|
||||
// If we are dialing through local gateways we sleep before issuing the first request.
|
||||
// This gives the local gateways some time to configure a route to the peer servers.
|
||||
time.Sleep(meshGatewayWait)
|
||||
}
|
||||
|
||||
retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout)
|
||||
defer cancel()
|
||||
|
||||
|
@ -553,7 +565,7 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering,
|
|||
defer conn.Close()
|
||||
|
||||
client := pbpeerstream.NewPeerStreamServiceClient(conn)
|
||||
resp, err = client.ExchangeSecret(ctx, &req)
|
||||
resp, err = client.ExchangeSecret(ctx, req)
|
||||
|
||||
// If we got a permission denied error that means out establishment secret is invalid, so we do not retry.
|
||||
grpcErr, ok := grpcstatus.FromError(err)
|
||||
|
|
|
@ -514,7 +514,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
testutil.RequireErrorContains(t, err, "connection refused")
|
||||
|
||||
require.Greater(t, time.Since(start), 5*time.Second)
|
||||
require.Greater(t, time.Since(start), 3*time.Second)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "peering can be established from token", func(t *testing.T) {
|
||||
|
@ -528,7 +528,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
|
|||
// Capture peering token for re-use later
|
||||
peeringToken = tokenResp.PeeringToken
|
||||
|
||||
// The context timeout is short, it checks that we do not wait the 350ms that we do when peering through mesh gateways
|
||||
// The context timeout is short, it checks that we do not wait the 1s that we do when peering through mesh gateways
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
|
@ -585,7 +585,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
|
|||
},
|
||||
}))
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 6*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// Call to establish should succeed when we fall back to remote server address.
|
||||
|
@ -630,7 +630,8 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
|
|||
},
|
||||
}))
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
||||
// Context is 1s sleep + 3s retry loop. Any longer and we're trying the remote gateway
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 4*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
start := time.Now()
|
||||
|
@ -642,8 +643,8 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Dialing through a gateway is preceded by a mandatory 350ms sleep.
|
||||
require.Greater(t, time.Since(start), 350*time.Millisecond)
|
||||
// Dialing through a gateway is preceded by a mandatory 1s sleep.
|
||||
require.Greater(t, time.Since(start), 1*time.Second)
|
||||
|
||||
// target.called is true when the tcproxy's conn handler was invoked.
|
||||
// This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway.
|
||||
|
|
Loading…
Reference in New Issue