From 24c431270c68f2e66d4b9e4a5c4ff82fa5b4997f Mon Sep 17 00:00:00 2001 From: Poonam Jadhav Date: Mon, 6 Feb 2023 11:31:25 -0500 Subject: [PATCH] feat: client RPC is retries on ErrRetryElsewhere error and forwardRequestToLeader method retries ErrRetryLater error (#16099) --- agent/consul/client.go | 12 +++++++++++- agent/consul/rpc.go | 20 +++++++++----------- agent/consul/rpc_test.go | 9 +++++++-- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/agent/consul/client.go b/agent/consul/client.go index 94f806a849..baf87bbfd9 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -16,6 +16,7 @@ import ( "golang.org/x/time/rate" "github.com/hashicorp/consul/acl" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -296,7 +297,16 @@ TRY: // Use the zero value for RPCInfo if the request doesn't implement RPCInfo info, _ := args.(structs.RPCInfo) - if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry { + retryableMessages := []error{ + // If we are chunking and it doesn't seem to have completed, try again. + ErrChunkingResubmit, + + // These rate limit errors are returned before the handler is called, so are + // safe to retry. + rpcRate.ErrRetryElsewhere, + } + + if retry := canRetry(info, rpcErr, firstCheck, c.config, retryableMessages); !retry { c.logger.Error("RPC failed to server", "method", method, "server", server.Addr, diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index e5c84ba682..3186d1cce3 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -557,7 +557,7 @@ func (c *limitedConn) Read(b []byte) (n int, err error) { } // canRetry returns true if the request and error indicate that a retry is safe. -func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) bool { +func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool { if info != nil { timedOut, timeoutError := info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime) if timeoutError != nil { @@ -579,15 +579,6 @@ func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) return true } - retryableMessages := []error{ - // If we are chunking and it doesn't seem to have completed, try again. - ErrChunkingResubmit, - - // These rate limit errors are returned before the handler is called, so are - // safe to retry. - rate.ErrRetryElsewhere, - rate.ErrRetryLater, - } for _, m := range retryableMessages { if err != nil && strings.Contains(err.Error(), m.Error()) { return true @@ -747,7 +738,14 @@ CHECK_LEADER: } } - if retry := canRetry(info, rpcErr, firstCheck, s.config); retry { + retryableMessages := []error{ + // If we are chunking and it doesn't seem to have completed, try again. + ErrChunkingResubmit, + + rate.ErrRetryLater, + } + + if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry { // Gate the request until there is a leader jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) select { diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 92821362da..18171725c5 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -31,6 +31,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/rate" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" agent_grpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/pool" @@ -1287,12 +1288,16 @@ func TestCanRetry(t *testing.T) { config := DefaultConfig() now := time.Now() config.RPCHoldTimeout = 7 * time.Second + retryableMessages := []error{ + ErrChunkingResubmit, + rpcRate.ErrRetryElsewhere, + } run := func(t *testing.T, tc testCase) { timeOutValue := tc.timeout if timeOutValue.IsZero() { timeOutValue = now } - require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config)) + require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config, retryableMessages)) } var testCases = []testCase{ @@ -1319,7 +1324,7 @@ func TestCanRetry(t *testing.T) { { name: "ErrRetryLater", err: fmt.Errorf("some wrapping: %w", rate.ErrRetryLater), - expected: true, + expected: false, }, { name: "EOF on read request",