diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index c4e5d98fc5..afc8af28a7 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -1956,7 +1956,8 @@ func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T })) // Create a peering at dialer by establishing a peering with acceptor's token - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + // 7 second = 1 second wait + 3 second gw retry + 3 second token addr retry + ctx, cancel = context.WithTimeout(context.Background(), 7*time.Second) t.Cleanup(cancel) conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 8c762ef347..7c0b7c2e54 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -106,27 +106,38 @@ func (b *PeeringBackend) GetLocalServerAddresses() ([]string, error) { } // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, -// a boolean indicating whether mesh gateways are present, and an optional error. +// an optional buffer of just gateway addresses, and an optional error. // The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present. -func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) { +func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, *ring.Ring, error) { newRing, err := b.fetchPeerServerAddresses(ws, peerID) if err != nil { - return nil, false, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err) + return nil, nil, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err) } gatewayRing, err := b.maybeFetchGatewayAddresses(ws) if err != nil { // If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses. - logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly: %w", "error", err) - return newRing, false, nil + logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly", "error", err) + return newRing, nil, nil } if gatewayRing != nil { // The ordering is important here. We always want to start with the mesh gateway // addresses and fallback to the remote addresses, so we append the server addresses - // in newRing to gatewayRing. - newRing = gatewayRing.Link(newRing) + // in newRing to gatewayRing. We also need a new ring to prevent mixing up pointers + // with the gateway-only buffer + compositeRing := ring.New(gatewayRing.Len() + newRing.Len()) + gatewayRing.Do(func(s any) { + compositeRing.Value = s.(string) + compositeRing = compositeRing.Next() + }) + + newRing.Do(func(s any) { + compositeRing.Value = s.(string) + compositeRing = compositeRing.Next() + }) + newRing = compositeRing } - return newRing, gatewayRing != nil, nil + return newRing, gatewayRing, nil } // fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses. diff --git a/agent/consul/peering_backend_test.go b/agent/consul/peering_backend_test.go index 4ca441f4b9..e933bcffa0 100644 --- a/agent/consul/peering_backend_test.go +++ b/agent/consul/peering_backend_test.go @@ -198,7 +198,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { type expectation struct { addrs []string - haveGateways bool + gatewayAddrs []string err string } @@ -214,14 +214,25 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { tc.setup(srv.fsm.State()) } - ring, haveGateways, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID) + ring, gatewayRing, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID) if tc.expect.err != "" { testutil.RequireErrorContains(t, err, tc.expect.err) return } - require.Equal(t, tc.expect.haveGateways, haveGateways) + require.Equal(t, len(tc.expect.gatewayAddrs) > 0, gatewayRing != nil) require.NotNil(t, ring) + if len(tc.expect.gatewayAddrs) > 0 { + var addrs []string + gatewayRing.Do(func(value any) { + addr, ok := value.(string) + + require.True(t, ok) + addrs = append(addrs, addr) + }) + require.Equal(t, tc.expect.gatewayAddrs, addrs) + } + var addrs []string ring.Do(func(value any) { addr, ok := value.(string) @@ -275,8 +286,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { }, peerID: dialerPeerID, expect: expectation{ - haveGateways: false, - addrs: []string{"5.6.7.8:8502"}, + addrs: []string{"5.6.7.8:8502"}, }, }, { @@ -294,8 +304,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { }, peerID: dialerPeerID, expect: expectation{ - haveGateways: false, - addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, }, }, { @@ -309,8 +318,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { }, peerID: dialerPeerID, expect: expectation{ - haveGateways: false, - addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, + addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, }, }, { @@ -326,8 +334,6 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { }, peerID: dialerPeerID, expect: expectation{ - haveGateways: false, - // Fall back to remote server addresses addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, }, @@ -372,10 +378,9 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) { }, peerID: dialerPeerID, expect: expectation{ - haveGateways: true, - // Gateways come first, and we use their LAN addresses since this is for outbound communication. - addrs: []string{"6.7.8.9:8443", "5.6.7.8:8443", "1.2.3.4:8502", "2.3.4.5:8503"}, + addrs: []string{"5.6.7.8:8443", "6.7.8.9:8443", "1.2.3.4:8502", "2.3.4.5:8503"}, + gatewayAddrs: []string{"5.6.7.8:8443", "6.7.8.9:8443"}, }, }, { diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index cc25a74770..45cbb98de5 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -9,7 +9,6 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -18,6 +17,8 @@ import ( grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/consul/state" @@ -41,10 +42,10 @@ var ( const ( // meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway. // This wait provides some time for the first gateway address to configure a route to the peer servers. - // Why 350ms? That is roughly the p50 latency we observed in a scale test for proxy config propagation: - // https://www.hashicorp.com/cgsb - meshGatewayWait = 350 * time.Millisecond - establishmentTimeout = 5 * time.Second + // This study shows latency distribution https://www.hashicorp.com/cgsb. + // With 1s we cover ~p96, then we initiate the 3-second retry loop. + meshGatewayWait = 1 * time.Second + establishmentTimeout = 3 * time.Second ) // errPeeringInvalidServerAddress is returned when an establish request contains @@ -140,10 +141,10 @@ type Backend interface { DecodeToken([]byte) (*structs.PeeringToken, error) // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, - // a boolean indicating whether mesh gateways are present, and an optional error. + // an optional buffer of just gateway addresses, and an optional error. // The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local // datacenter is configured to dial through mesh gateways. - GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) + GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, *ring.Ring, error) EnterpriseCheckPartitions(partition string) error @@ -516,11 +517,28 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering, return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) } - ringBuf, usingGateways, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID) + allAddrs, gatewayAddrs, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID) if err != nil { return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err) } + if gatewayAddrs != nil { + // If we are dialing through local gateways we sleep before issuing the first request. + // This gives the local gateways some time to configure a route to the peer servers. + time.Sleep(meshGatewayWait) + + // Exclusively try + resp, _ := retryExchange(ctx, &req, gatewayAddrs, tlsOption) + if resp != nil { + return resp, nil + } + } + + return retryExchange(ctx, &req, allAddrs, tlsOption) +} + +// retryExchange attempts a secret exchange in a retry loop, taking a new address from the ring buffer on each iteration +func retryExchange(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest, ringBuf *ring.Ring, tlsOption grpc.DialOption) (*pbpeerstream.ExchangeSecretResponse, error) { var ( resp *pbpeerstream.ExchangeSecretResponse dialErrors error @@ -529,12 +547,6 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering, retryWait := 150 * time.Millisecond jitter := retry.NewJitter(25) - if usingGateways { - // If we are dialing through local gateways we sleep before issuing the first request. - // This gives the local gateways some time to configure a route to the peer servers. - time.Sleep(meshGatewayWait) - } - retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout) defer cancel() @@ -553,7 +565,7 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering, defer conn.Close() client := pbpeerstream.NewPeerStreamServiceClient(conn) - resp, err = client.ExchangeSecret(ctx, &req) + resp, err = client.ExchangeSecret(ctx, req) // If we got a permission denied error that means out establishment secret is invalid, so we do not retry. grpcErr, ok := grpcstatus.FromError(err) diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index d1b42d9f6d..faae32b632 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -514,7 +514,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { require.Error(t, err) testutil.RequireErrorContains(t, err, "connection refused") - require.Greater(t, time.Since(start), 5*time.Second) + require.Greater(t, time.Since(start), 3*time.Second) }) testutil.RunStep(t, "peering can be established from token", func(t *testing.T) { @@ -528,7 +528,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { // Capture peering token for re-use later peeringToken = tokenResp.PeeringToken - // The context timeout is short, it checks that we do not wait the 350ms that we do when peering through mesh gateways + // The context timeout is short, it checks that we do not wait the 1s that we do when peering through mesh gateways ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond) t.Cleanup(cancel) @@ -585,7 +585,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { }, })) - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 6*time.Second) t.Cleanup(cancel) // Call to establish should succeed when we fall back to remote server address. @@ -630,7 +630,8 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { }, })) - ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + // Context is 1s sleep + 3s retry loop. Any longer and we're trying the remote gateway + ctx, cancel = context.WithTimeout(context.Background(), 4*time.Second) t.Cleanup(cancel) start := time.Now() @@ -642,8 +643,8 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) { }) require.NoError(t, err) - // Dialing through a gateway is preceded by a mandatory 350ms sleep. - require.Greater(t, time.Since(start), 350*time.Millisecond) + // Dialing through a gateway is preceded by a mandatory 1s sleep. + require.Greater(t, time.Since(start), 1*time.Second) // target.called is true when the tcproxy's conn handler was invoked. // This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway.