diff --git a/agent/consul/client.go b/agent/consul/client.go index baf87bbfd9..c1ea41f899 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -272,8 +272,10 @@ func (c *Client) RPC(ctx context.Context, method string, args interface{}, reply // starting the timer here we won't potentially double up the delay. // TODO (slackpad) Plumb a deadline here with a context. firstCheck := time.Now() - + retryCount := 0 + previousJitter := time.Duration(0) TRY: + retryCount++ manager, server := c.router.FindLANRoute() if server == nil { return structs.ErrNoServers @@ -323,7 +325,9 @@ TRY: ) // We can wait a bit and retry! - jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) + jitter := lib.RandomStaggerWithRange(previousJitter, getWaitTime(c.config.RPCHoldTimeout, retryCount)) + previousJitter = jitter + select { case <-time.After(jitter): goto TRY diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 3186d1cce3..2bad1a8b50 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math" "net" "strings" "sync/atomic" @@ -556,6 +557,21 @@ func (c *limitedConn) Read(b []byte) (n int, err error) { return c.lr.Read(b) } +func getWaitTime(rpcHoldTimeout time.Duration, retryCount int) time.Duration { + const backoffMultiplier = 2.0 + + rpcHoldTimeoutInMilli := int(rpcHoldTimeout.Milliseconds()) + initialBackoffInMilli := rpcHoldTimeoutInMilli / structs.JitterFraction + + if initialBackoffInMilli < 1 { + initialBackoffInMilli = 1 + } + + waitTimeInMilli := initialBackoffInMilli * int(math.Pow(backoffMultiplier, float64(retryCount-1))) + + return time.Duration(waitTimeInMilli) * time.Millisecond +} + // canRetry returns true if the request and error indicate that a retry is safe. func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool { if info != nil { @@ -714,7 +730,10 @@ func (s *Server) canServeReadRequest(info structs.RPCInfo) bool { // See the comment for forwardRPC for more details. func (s *Server) forwardRequestToLeader(info structs.RPCInfo, forwardToLeader func(leader *metadata.Server) error) (handled bool, err error) { firstCheck := time.Now() + retryCount := 0 + previousJitter := time.Duration(0) CHECK_LEADER: + retryCount++ // Fail fast if we are in the process of leaving select { case <-s.leaveCh: @@ -747,7 +766,9 @@ CHECK_LEADER: if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry { // Gate the request until there is a leader - jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) + jitter := lib.RandomStaggerWithRange(previousJitter, getWaitTime(s.config.RPCHoldTimeout, retryCount)) + previousJitter = jitter + select { case <-time.After(jitter): goto CHECK_LEADER diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 18171725c5..843dd4c1f4 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1620,6 +1620,60 @@ func TestRPC_AuthorizeRaftRPC(t *testing.T) { } } +func TestGetWaitTime(t *testing.T) { + type testCase struct { + name string + RPCHoldTimeout time.Duration + expected time.Duration + retryCount int + } + config := DefaultConfig() + + run := func(t *testing.T, tc testCase) { + config.RPCHoldTimeout = tc.RPCHoldTimeout + require.Equal(t, tc.expected, getWaitTime(config.RPCHoldTimeout, tc.retryCount)) + } + + var testCases = []testCase{ + { + name: "init backoff small", + RPCHoldTimeout: 7 * time.Millisecond, + retryCount: 1, + expected: 1 * time.Millisecond, + }, + { + name: "first attempt", + RPCHoldTimeout: 7 * time.Second, + retryCount: 1, + expected: 437 * time.Millisecond, + }, + { + name: "second attempt", + RPCHoldTimeout: 7 * time.Second, + retryCount: 2, + expected: 874 * time.Millisecond, + }, + { + name: "third attempt", + RPCHoldTimeout: 7 * time.Second, + retryCount: 3, + expected: 1748 * time.Millisecond, + }, + { + name: "fourth attempt", + RPCHoldTimeout: 7 * time.Second, + retryCount: 4, + expected: 3496 * time.Millisecond, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + func doRaftRPC(conn net.Conn, leader string) (raft.AppendEntriesResponse, error) { var resp raft.AppendEntriesResponse diff --git a/lib/cluster.go b/lib/cluster.go index 79da2e458f..b69d17bfe2 100644 --- a/lib/cluster.go +++ b/lib/cluster.go @@ -45,6 +45,11 @@ func RandomStagger(intv time.Duration) time.Duration { return time.Duration(uint64(rand.Int63()) % uint64(intv)) } +// RandomStaggerWithRange returns an interval between min and the max duration +func RandomStaggerWithRange(min time.Duration, max time.Duration) time.Duration { + return RandomStagger(max-min) + min +} + // RateScaledInterval is used to choose an interval to perform an action in // order to target an aggregate number of actions per second across the whole // cluster. diff --git a/lib/cluster_test.go b/lib/cluster_test.go index 19a65179a4..02629c827f 100644 --- a/lib/cluster_test.go +++ b/lib/cluster_test.go @@ -1,6 +1,8 @@ package lib import ( + "github.com/stretchr/testify/require" + "math" "testing" "time" ) @@ -172,3 +174,25 @@ func TestRateScaledInterval(t *testing.T) { t.Fatalf("Bad: %v", v) } } + +func TestRandomStaggerWithRange(t *testing.T) { + type args struct { + min time.Duration + max time.Duration + } + tests := []struct { + name string + args args + }{ + {"min-max 0", args{time.Duration(0), time.Duration(0)}}, + {"min-max big", args{time.Duration(math.MaxInt64), time.Duration(math.MaxInt64)}}, + {"normal case", args{time.Duration(3), time.Duration(7)}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := RandomStaggerWithRange(tt.args.min, tt.args.max) + require.GreaterOrEqual(t, got, tt.args.min) + require.LessOrEqual(t, got, tt.args.max) + }) + } +}