diff --git a/.changelog/15541.txt b/.changelog/15541.txt new file mode 100644 index 0000000000..1ec33896e4 --- /dev/null +++ b/.changelog/15541.txt @@ -0,0 +1,3 @@ +```release-note:bug +agent: Fixed issue where blocking queries with short waits could timeout on the client +``` diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 530fd6b892..1944c5bc81 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -532,6 +532,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { Datacenter: c.Datacenter, DefaultQueryTime: c.DefaultQueryTime, MaxQueryTime: c.MaxQueryTime, + RPCHoldTimeout: c.RPCHoldTimeout, } connPool.SetRPCClientTimeout(c.RPCClientTimeout) return Deps{ @@ -881,8 +882,9 @@ func TestClient_RPC_Timeout(t *testing.T) { c.Datacenter = "dc1" c.NodeName = uniqueNodeName(t.Name()) c.RPCClientTimeout = 10 * time.Millisecond - c.DefaultQueryTime = 100 * time.Millisecond + c.DefaultQueryTime = 150 * time.Millisecond c.MaxQueryTime = 200 * time.Millisecond + c.RPCHoldTimeout = 50 * time.Millisecond }) joinLAN(t, c1, s1) @@ -897,8 +899,8 @@ func TestClient_RPC_Timeout(t *testing.T) { require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond})) t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) { - // Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms) - // so we expect the RPC call to timeout. + // Requests with QueryOptions have a default timeout of + // RPCClientTimeout (10ms) so we expect the RPC call to timeout. var out struct{} err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out) require.Error(t, err) @@ -931,8 +933,30 @@ func TestClient_RPC_Timeout(t *testing.T) { }, &out)) }) - t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) { + t.Run("blocking query with MaxQueryTime succeeds", func(t *testing.T) { var out struct{} + // Although we set MaxQueryTime to 100ms, the client is adding maximum + // jitter (100ms / 16 = 6.25ms) as well as RPCHoldTimeout (50ms). + // Client waits 156.25ms while the server waits 106.25ms (artifically + // adds maximum jitter) so the server will always return first. + require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: 1, + MaxQueryTime: 100 * time.Millisecond, + }, + }, &out)) + }) + + // This following scenario should not occur in practice since the server + // should be aware of RPC timeouts and always return blocking queries before + // the client closes the connection. But this is just a hypothetical case + // to show waiter can fail since it does not consider QueryOptions. + t.Run("blocking query with low MaxQueryTime fails", func(t *testing.T) { + var out struct{} + // Although we set MaxQueryTime to 20ms, the client is adding maximum + // jitter (20ms / 16 = 1.25ms) as well as RPCHoldTimeout (50ms). + // Client waits 71.25ms while the server waits 106.25ms (artifically + // adds maximum jitter) so the client will error first. err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{ QueryOptions: structs.QueryOptions{ MinQueryIndex: 1, @@ -940,6 +964,6 @@ func TestClient_RPC_Timeout(t *testing.T) { }, }, &out) require.Error(t, err) - require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + require.ErrorContains(t, err, "rpc error making call: i/o deadline reached") }) } diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 068bc56c28..593838601f 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -140,6 +140,9 @@ type ConnPool struct { // TODO: consider refactoring to accept a full yamux.Config instead of a logger Logger *log.Logger + // RPCHoldTimeout is used as a buffer when calculating timeouts to + // allow for leader rotation. + RPCHoldTimeout time.Duration // MaxQueryTime is used for calculating timeouts on blocking queries. MaxQueryTime time.Duration // DefaultQueryTime is used for calculating timeouts on blocking queries. @@ -630,8 +633,9 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, if bq, ok := args.(BlockableQuery); ok { blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime) if blockingTimeout > 0 { - // override the default client timeout - timeout = blockingTimeout + // Override the default client timeout but add RPCHoldTimeout + // as a buffer for retries during leadership changes. + timeout = blockingTimeout + p.RPCHoldTimeout } } if timeout > 0 { diff --git a/agent/setup.go b/agent/setup.go index 90aca3d2ab..b014996dfa 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -186,6 +186,7 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), TLSConfigurator: tls, Datacenter: config.Datacenter, + RPCHoldTimeout: config.RPCHoldTimeout, MaxQueryTime: config.MaxQueryTime, DefaultQueryTime: config.DefaultQueryTime, }