From b0b0cbb8eee154879e414c621eee3a37c915c8a9 Mon Sep 17 00:00:00 2001 From: malizz Date: Thu, 13 Oct 2022 13:46:51 -0700 Subject: [PATCH] increase protobuf size limit for cluster peering (#14976) --- agent/consul/leader_peering.go | 33 +++++++++++------ agent/consul/leader_peering_test.go | 35 +++++++++++++++---- agent/grpc-external/server.go | 1 + .../services/peerstream/stream_resources.go | 5 +++ .../docs/connect/cluster-peering/index.mdx | 2 +- 5 files changed, 57 insertions(+), 19 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 62cc051989..5bdcfe500c 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -338,9 +338,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me // Try a new address on each iteration by advancing the ring buffer on errors. addr := <-nextServerAddr - logger.Trace("dialing peer", "addr", addr) - conn, err := grpc.DialContext(streamCtx, addr, - // TODO(peering): use a grpc.WithStatsHandler here?) + opts := []grpc.DialOption{ tlsOption, // For keep alive parameters there is a larger comment in ClientConnPool.dial about that. grpc.WithKeepaliveParams(keepalive.ClientParameters{ @@ -349,7 +347,12 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me // send keepalive pings even if there is no active streams PermitWithoutStream: true, }), - ) + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)), + } + // TODO(peering): use a grpc.WithStatsHandler here?) + logger.Trace("dialing peer", "addr", addr) + conn, err := grpc.DialContext(streamCtx, addr, opts...) + if err != nil { return fmt.Errorf("failed to dial: %w", err) } @@ -397,10 +400,14 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me }, func(err error) { // TODO(peering): why are we using TrackSendError here? This could also be a receive error. streamStatus.TrackSendError(err.Error()) - if isFailedPreconditionErr(err) { + if isErrCode(err, codes.FailedPrecondition) { logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", "error", err) return + } else if isErrCode(err, codes.ResourceExhausted) { + logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting", + "error", err) + return } logger.Error("error managing peering stream", "error", err) }, peeringRetryTimeout) @@ -680,7 +687,7 @@ func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn fu // quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry // until our request lands on a leader. func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { - if loopErr != nil && isFailedPreconditionErr(loopErr) { + if loopErr != nil && isErrCode(loopErr, codes.FailedPrecondition) { // Wait a constant time for the first number of retries. if failedAttempts <= maxFastConnRetries { return fastConnRetryTimeout @@ -697,6 +704,11 @@ func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { return ms } + // if the message sent is too large probably should not retry at all + if loopErr != nil && isErrCode(loopErr, codes.ResourceExhausted) { + return maxFastRetryBackoff + } + // Else we go with the default backoff from retryLoopBackoff. if (1 << failedAttempts) < maxRetryBackoff { return (1 << failedAttempts) * time.Second @@ -704,23 +716,22 @@ func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { return time.Duration(maxRetryBackoff) * time.Second } -// isFailedPreconditionErr returns true if err is a gRPC error with code FailedPrecondition. -func isFailedPreconditionErr(err error) bool { +// isErrCode returns true if err is a gRPC error with given error code. +func isErrCode(err error, code codes.Code) bool { if err == nil { return false } - // Handle wrapped errors, since status.FromError does a naive assertion. var statusErr interface { GRPCStatus() *grpcstatus.Status } if errors.As(err, &statusErr) { - return statusErr.GRPCStatus().Code() == codes.FailedPrecondition + return statusErr.GRPCStatus().Code() == code } grpcErr, ok := grpcstatus.FromError(err) if !ok { return false } - return grpcErr.Code() == codes.FailedPrecondition + return grpcErr.Code() == code } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index b6b70bd22e..5f59787764 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -1686,14 +1686,35 @@ func TestLeader_Peering_retryLoopBackoffPeering_cancelContext(t *testing.T) { }, allErrors) } -func Test_isFailedPreconditionErr(t *testing.T) { - st := grpcstatus.New(codes.FailedPrecondition, "cannot establish a peering stream on a follower node") - err := st.Err() - assert.True(t, isFailedPreconditionErr(err)) +func Test_isErrCode(t *testing.T) { + tests := []struct { + name string + expectedCode codes.Code + }{ + { + name: "cannot establish a peering stream on a follower node", + expectedCode: codes.FailedPrecondition, + }, + { + name: "received message larger than max ", + expectedCode: codes.ResourceExhausted, + }, + { + name: "deadline exceeded", + expectedCode: codes.DeadlineExceeded, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + st := grpcstatus.New(tc.expectedCode, tc.name) + err := st.Err() + assert.True(t, isErrCode(err, tc.expectedCode)) - // test that wrapped errors are checked correctly - werr := fmt.Errorf("wrapped: %w", err) - assert.True(t, isFailedPreconditionErr(werr)) + // test that wrapped errors are checked correctly + werr := fmt.Errorf("wrapped: %w", err) + assert.True(t, isErrCode(werr, tc.expectedCode)) + }) + } } func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) { diff --git a/agent/grpc-external/server.go b/agent/grpc-external/server.go index 6342053373..59ca0dde2f 100644 --- a/agent/grpc-external/server.go +++ b/agent/grpc-external/server.go @@ -29,6 +29,7 @@ func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics) *grpc opts := []grpc.ServerOption{ grpc.MaxConcurrentStreams(2048), + grpc.MaxRecvMsgSize(50 * 1024 * 1024), grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)), middleware.WithUnaryServerChain( // Add middlware interceptors to recover in case of panics. diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 143b236380..281156a222 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -367,6 +367,11 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // resources, not request/ack messages. if msg.GetResponse() != nil { if err != nil { + if id := msg.GetResponse().GetResourceID(); id != "" { + logger.Error("failed to send resource", "resourceID", id, "error", err) + status.TrackSendError(err.Error()) + return nil + } status.TrackSendError(err.Error()) } else { status.TrackSendSuccess() diff --git a/website/content/docs/connect/cluster-peering/index.mdx b/website/content/docs/connect/cluster-peering/index.mdx index 76cbf1de0d..dfff85d7b8 100644 --- a/website/content/docs/connect/cluster-peering/index.mdx +++ b/website/content/docs/connect/cluster-peering/index.mdx @@ -55,7 +55,7 @@ The cluster peering beta release includes the following features and functionali Not all features and functionality are available in the beta release. In particular, consider the following technical constraints: - Mesh gateways for _server to server traffic_ are not available. -- Services with node, instance, and check definitions totaling more than 4MB cannot be exported to a peer. +- Services with node, instance, and check definitions totaling more than 50MB cannot be exported to a peer. - The `service-splitter` and `service-router` configuration entry kinds cannot directly target a peer. To split or route traffic to a service instance on a peer, you must supplement your desired dynamic routing definition with a `service-resolver` config entry on the dialing cluster. Refer to [L7 traffic management between peers](/docs/connect/cluster-peering/create-manage-peering#L7-traffic) for more information. - The `consul intention` CLI command is not supported. To manage intentions that specify services in peered clusters, use [configuration entries](/docs/connect/config-entries/service-intentions). - Accessing key/value stores across peers is not supported.