mirror of https://github.com/status-im/consul.git
Use rpcHoldTimeout to calculate blocking timeout (#15541)
Adds buffer to clients so that servers have time to respond to blocking queries.
This commit is contained in:
parent
6b477ceff8
commit
386da5439a
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
agent: Fixed issue where blocking queries with short waits could timeout on the client
|
||||||
|
```
|
|
@ -532,6 +532,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
||||||
Datacenter: c.Datacenter,
|
Datacenter: c.Datacenter,
|
||||||
DefaultQueryTime: c.DefaultQueryTime,
|
DefaultQueryTime: c.DefaultQueryTime,
|
||||||
MaxQueryTime: c.MaxQueryTime,
|
MaxQueryTime: c.MaxQueryTime,
|
||||||
|
RPCHoldTimeout: c.RPCHoldTimeout,
|
||||||
}
|
}
|
||||||
connPool.SetRPCClientTimeout(c.RPCClientTimeout)
|
connPool.SetRPCClientTimeout(c.RPCClientTimeout)
|
||||||
return Deps{
|
return Deps{
|
||||||
|
@ -881,8 +882,9 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
||||||
c.Datacenter = "dc1"
|
c.Datacenter = "dc1"
|
||||||
c.NodeName = uniqueNodeName(t.Name())
|
c.NodeName = uniqueNodeName(t.Name())
|
||||||
c.RPCClientTimeout = 10 * time.Millisecond
|
c.RPCClientTimeout = 10 * time.Millisecond
|
||||||
c.DefaultQueryTime = 100 * time.Millisecond
|
c.DefaultQueryTime = 150 * time.Millisecond
|
||||||
c.MaxQueryTime = 200 * time.Millisecond
|
c.MaxQueryTime = 200 * time.Millisecond
|
||||||
|
c.RPCHoldTimeout = 50 * time.Millisecond
|
||||||
})
|
})
|
||||||
joinLAN(t, c1, s1)
|
joinLAN(t, c1, s1)
|
||||||
|
|
||||||
|
@ -897,8 +899,8 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
||||||
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))
|
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))
|
||||||
|
|
||||||
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
|
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
|
||||||
// Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms)
|
// Requests with QueryOptions have a default timeout of
|
||||||
// so we expect the RPC call to timeout.
|
// RPCClientTimeout (10ms) so we expect the RPC call to timeout.
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
|
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
@ -931,8 +933,30 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
||||||
}, &out))
|
}, &out))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
|
t.Run("blocking query with MaxQueryTime succeeds", func(t *testing.T) {
|
||||||
var out struct{}
|
var out struct{}
|
||||||
|
// Although we set MaxQueryTime to 100ms, the client is adding maximum
|
||||||
|
// jitter (100ms / 16 = 6.25ms) as well as RPCHoldTimeout (50ms).
|
||||||
|
// Client waits 156.25ms while the server waits 106.25ms (artifically
|
||||||
|
// adds maximum jitter) so the server will always return first.
|
||||||
|
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
MinQueryIndex: 1,
|
||||||
|
MaxQueryTime: 100 * time.Millisecond,
|
||||||
|
},
|
||||||
|
}, &out))
|
||||||
|
})
|
||||||
|
|
||||||
|
// This following scenario should not occur in practice since the server
|
||||||
|
// should be aware of RPC timeouts and always return blocking queries before
|
||||||
|
// the client closes the connection. But this is just a hypothetical case
|
||||||
|
// to show waiter can fail since it does not consider QueryOptions.
|
||||||
|
t.Run("blocking query with low MaxQueryTime fails", func(t *testing.T) {
|
||||||
|
var out struct{}
|
||||||
|
// Although we set MaxQueryTime to 20ms, the client is adding maximum
|
||||||
|
// jitter (20ms / 16 = 1.25ms) as well as RPCHoldTimeout (50ms).
|
||||||
|
// Client waits 71.25ms while the server waits 106.25ms (artifically
|
||||||
|
// adds maximum jitter) so the client will error first.
|
||||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
MinQueryIndex: 1,
|
MinQueryIndex: 1,
|
||||||
|
@ -940,6 +964,6 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
||||||
},
|
},
|
||||||
}, &out)
|
}, &out)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
require.ErrorContains(t, err, "rpc error making call: i/o deadline reached")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,9 @@ type ConnPool struct {
|
||||||
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
|
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
|
||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
|
|
||||||
|
// RPCHoldTimeout is used as a buffer when calculating timeouts to
|
||||||
|
// allow for leader rotation.
|
||||||
|
RPCHoldTimeout time.Duration
|
||||||
// MaxQueryTime is used for calculating timeouts on blocking queries.
|
// MaxQueryTime is used for calculating timeouts on blocking queries.
|
||||||
MaxQueryTime time.Duration
|
MaxQueryTime time.Duration
|
||||||
// DefaultQueryTime is used for calculating timeouts on blocking queries.
|
// DefaultQueryTime is used for calculating timeouts on blocking queries.
|
||||||
|
@ -630,8 +633,9 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
|
||||||
if bq, ok := args.(BlockableQuery); ok {
|
if bq, ok := args.(BlockableQuery); ok {
|
||||||
blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime)
|
blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime)
|
||||||
if blockingTimeout > 0 {
|
if blockingTimeout > 0 {
|
||||||
// override the default client timeout
|
// Override the default client timeout but add RPCHoldTimeout
|
||||||
timeout = blockingTimeout
|
// as a buffer for retries during leadership changes.
|
||||||
|
timeout = blockingTimeout + p.RPCHoldTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if timeout > 0 {
|
if timeout > 0 {
|
||||||
|
|
|
@ -186,6 +186,7 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
||||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||||
TLSConfigurator: tls,
|
TLSConfigurator: tls,
|
||||||
Datacenter: config.Datacenter,
|
Datacenter: config.Datacenter,
|
||||||
|
RPCHoldTimeout: config.RPCHoldTimeout,
|
||||||
MaxQueryTime: config.MaxQueryTime,
|
MaxQueryTime: config.MaxQueryTime,
|
||||||
DefaultQueryTime: config.DefaultQueryTime,
|
DefaultQueryTime: config.DefaultQueryTime,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue