Add timeout to Client RPC calls (#11500)

Adds a timeout (deadline) to client RPC calls, so that streams will no longer hang indefinitely in unstable network conditions.

Co-authored-by: kisunji <ckim@hashicorp.com>
This commit is contained in:
Will Jordan 2022-04-21 13:21:35 -07:00 committed by GitHub
parent 406b7e12b7
commit c48120d005
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 226 additions and 39 deletions

4
.changelog/11500.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:bugfix
rpc: Adds a deadline to client RPC calls, so that streams will no longer hang
indefinitely in unstable network conditions. [[GH-8504](https://github.com/hashicorp/consul/issues/8504)]
```

View File

@ -1650,6 +1650,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()

View File

@ -291,20 +291,26 @@ TRY:
}
// Move off to another server, and see if we can retry.
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
manager.NotifyFailedServer(server)
// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
info, _ := args.(structs.RPCInfo)
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry {
c.logger.Error("RPC failed to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
return rpcErr
}
c.logger.Warn("Retrying RPC to server",
"method", method,
"server", server.Addr,
"error", rpcErr,
)
// We can wait a bit and retry!
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {

View File

@ -48,6 +48,7 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}
@ -72,7 +73,7 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
}
// Apply config to copied fields because many tests only set the old
//values.
// values.
config.ACLResolverSettings.ACLsEnabled = config.ACLsEnabled
config.ACLResolverSettings.NodeName = config.NodeName
config.ACLResolverSettings.Datacenter = config.Datacenter
@ -521,13 +522,16 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
resolver.Register(builder)
connPool := &pool.ConnPool{
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}
return Deps{
@ -853,3 +857,67 @@ func TestClient_ShortReconnectTimeout(t *testing.T) {
50*time.Millisecond,
"The client node was not reaped within the alotted time")
}
type waiter struct {
duration time.Duration
}
func (w *waiter) Wait(struct{}, *struct{}) error {
time.Sleep(w.duration)
return nil
}
func TestClient_RPC_Timeout(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServerWithConfig(t)
_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 10 * time.Millisecond
c.DefaultQueryTime = 100 * time.Millisecond
c.MaxQueryTime = 200 * time.Millisecond
})
joinLAN(t, c1, s1)
retry.Run(t, func(r *retry.R) {
var out struct{}
if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
r.Fatalf("err: %v", err)
}
})
// waiter will sleep for 50ms
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 50 * time.Millisecond}))
// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
// Blocking requests have a longer timeout (100ms) so this should pass
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out)
require.NoError(t, err)
// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
}

View File

@ -1374,6 +1374,10 @@ func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime
return false, nil
}
func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}
func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)

View File

@ -165,7 +165,7 @@ func testServerConfig(t *testing.T) (string, *Config) {
// TODO (slackpad) - We should be able to run all tests w/o this, but it
// looks like several depend on it.
config.RPCHoldTimeout = 5 * time.Second
config.RPCHoldTimeout = 10 * time.Second
config.ConnectEnabled = true
config.CAConfig = &structs.CAConfiguration{

View File

@ -31,7 +31,7 @@ type muxSession interface {
// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream net.Conn
stream *TimeoutConn
codec rpc.ClientCodec
}
@ -56,6 +56,36 @@ type Conn struct {
clientLock sync.Mutex
}
// TimeoutConn wraps net.Conn with a read timeout.
// When set, FirstReadTimeout only applies to the very next Read.
// DefaultTimeout is used for any other Read.
type TimeoutConn struct {
net.Conn
DefaultTimeout time.Duration
FirstReadTimeout time.Duration
}
func (c *TimeoutConn) Read(b []byte) (int, error) {
timeout := c.DefaultTimeout
// Apply timeout to first read then zero it out
if c.FirstReadTimeout > 0 {
timeout = c.FirstReadTimeout
c.FirstReadTimeout = 0
}
var deadline time.Time
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
if err := c.Conn.SetReadDeadline(deadline); err != nil {
return 0, err
}
return c.Conn.Read(b)
}
func (c *TimeoutConn) Write(b []byte) (int, error) {
return c.Conn.Write(b)
}
func (c *Conn) Close() error {
return c.session.Close()
}
@ -79,12 +109,14 @@ func (c *Conn) getClient() (*StreamClient, error) {
return nil, err
}
timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}
// Create the RPC client
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)
// Return a new stream client
sc := &StreamClient{
stream: stream,
stream: timeoutStream,
codec: codec,
}
return sc, nil
@ -101,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) {
// If this is a Yamux stream, shrink the internal buffers so that
// we can GC the idle memory
if ys, ok := client.stream.(*yamux.Stream); ok {
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
ys.Shrink()
}
}
@ -133,6 +165,13 @@ type ConnPool struct {
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
Logger *log.Logger
// The default timeout for stream reads/writes
Timeout time.Duration
// Used for calculating timeouts on RPC requests
MaxQueryTime time.Duration
DefaultQueryTime time.Duration
// The maximum time to keep a connection open
MaxTime time.Duration
@ -325,7 +364,7 @@ func (p *ConnPool) dial(
tlsRPCType RPCType,
) (net.Conn, HalfCloser, error) {
// Try to dial the conn
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
@ -590,6 +629,11 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
return fmt.Errorf("rpc error getting client: %w", err)
}
// Use the zero value if the request doesn't implement RPCInfo
if info, ok := args.(structs.RPCInfo); ok {
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
}
// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
if err != nil {

View File

@ -169,11 +169,14 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
}
pool := &pool.ConnPool{
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Timeout: config.RPCHoldTimeout,
MaxQueryTime: config.MaxQueryTime,
DefaultQueryTime: config.DefaultQueryTime,
}
if config.ServerMode {
pool.MaxTime = 2 * time.Minute

View File

@ -18,14 +18,12 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/golang/protobuf/proto"
ptypes "github.com/golang/protobuf/ptypes"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/coordinate"
"github.com/mitchellh/hashstructure"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
ptypes "github.com/golang/protobuf/ptypes"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/api"
@ -217,6 +215,7 @@ type RPCInfo interface {
TokenSecret() string
SetTokenSecret(string)
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error)
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration
}
// QueryOptions is used to specify various flags for read queries
@ -315,18 +314,24 @@ func (q *QueryOptions) SetTokenSecret(s string) {
q.Token = s
}
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
// Match logic in Server.blockingQuery.
if q.MinQueryIndex > 0 {
if q.MaxQueryTime > maxQueryTime {
q.MaxQueryTime = maxQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = defaultQueryTime
}
// Timeout after maximum jitter has elapsed.
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)
return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout), nil
return q.MaxQueryTime + rpcHoldTimeout
}
return time.Since(start) > rpcHoldTimeout, nil
return rpcHoldTimeout
}
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
type WriteRequest struct {
@ -353,7 +358,11 @@ func (w *WriteRequest) SetTokenSecret(s string) {
}
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
}
type QueryBackend int

View File

@ -344,6 +344,14 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
}
// Timeout implements structs.RPCInfo
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
if msg == nil || msg.%[2]s == nil {
return 0
}
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
}
// IsRead implements structs.RPCInfo
func (msg *%[1]s) IsRead() bool {
return false
@ -392,6 +400,14 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
}
// Timeout implements structs.RPCInfo
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
if msg == nil || msg.%[2]s == nil {
return 0
}
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
}
// SetTokenSecret implements structs.RPCInfo
func (msg *%[1]s) SetTokenSecret(s string) {
// TODO: initialize if nil
@ -443,6 +459,15 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
}
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
}
// Timeout implements structs.RPCInfo
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
if msg == nil || msg.%[2]s == nil {
return 0
}
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
}
// SetTokenSecret implements structs.RPCInfo
func (msg *%[1]s) SetTokenSecret(s string) {
// TODO: initialize if nil

View File

@ -23,5 +23,9 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) {
}
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
}

View File

@ -75,12 +75,16 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) {
}
func (q *QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
func (q *QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
maxTime := structs.DurationFromProto(q.MaxQueryTime)
o := structs.QueryOptions{
MaxQueryTime: maxTime,
MinQueryIndex: q.MinQueryIndex,
}
return o.HasTimedOut(start, rpcHoldTimeout, maxQueryTime, defaultQueryTime)
return o.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime)
}
// SetFilter is needed to implement the structs.QueryOptionsCompat interface
@ -113,8 +117,13 @@ func (w *WriteRequest) AllowStaleRead() bool {
}
// HasTimedOut implements structs.RPCInfo
func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
// Timeout implements structs.RPCInfo
func (w *WriteRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
return rpcHoldTimeout
}
// IsRead implements structs.RPCInfo
@ -140,7 +149,12 @@ func (r *ReadRequest) SetTokenSecret(token string) {
// HasTimedOut implements structs.RPCInfo
func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > r.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
// Timeout implements structs.RPCInfo
func (r *ReadRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
return rpcHoldTimeout
}
// RequestDatacenter implements structs.RPCInfo

View File

@ -29,5 +29,10 @@ func (req *SubscribeRequest) SetTokenSecret(token string) {
// HasTimedOut implements structs.RPCInfo
func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
// Timeout implements structs.RPCInfo
func (req *SubscribeRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
}