diff --git a/.changelog/14965.txt b/.changelog/14965.txt new file mode 100644 index 0000000000..ad5eb17ce6 --- /dev/null +++ b/.changelog/14965.txt @@ -0,0 +1,3 @@ +```release-note:feature +agent: Added a new config option `rpc_client_timeout` to tune timeouts for client RPC requests +``` diff --git a/agent/agent.go b/agent/agent.go index 06aa649f69..6d386cd9f7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1407,6 +1407,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co // RPC-related performance configs. We allow explicit zero value to disable so // copy it whatever the value. cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout + cfg.RPCClientTimeout = runtimeCfg.RPCClientTimeout cfg.RPCConfig = runtimeCfg.RPCConfig @@ -4142,6 +4143,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { } cc := consul.ReloadableConfig{ + RPCClientTimeout: newCfg.RPCClientTimeout, RPCRateLimit: newCfg.RPCRateLimit, RPCMaxBurst: newCfg.RPCMaxBurst, RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient, diff --git a/agent/agent_test.go b/agent/agent_test.go index 702bdda126..2c941e9058 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4198,6 +4198,36 @@ func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) { require.True(t, a.consulConfig().AutoEncryptAllowTLS) } +func TestAgent_ReloadConfigRPCClientConfig(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + hcl := ` + data_dir = "` + dataDir + `" + server = false + bootstrap = false + ` + a := NewTestAgent(t, hcl) + + defaultRPCTimeout := 60 * time.Second + require.Equal(t, defaultRPCTimeout, a.baseDeps.ConnPool.RPCClientTimeout()) + + hcl = ` + data_dir = "` + dataDir + `" + server = false + bootstrap = false + limits { + rpc_client_timeout = "2m" + } + ` + c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl}) + require.NoError(t, a.reloadConfigInternal(c)) + + require.Equal(t, 2*time.Minute, a.baseDeps.ConnPool.RPCClientTimeout()) +} + func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/config/builder.go b/agent/config/builder.go index 735a3ffa34..6ee7e8d9e6 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -19,7 +19,6 @@ import ( "time" "github.com/armon/go-metrics/prometheus" - hcpconfig "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" @@ -27,6 +26,8 @@ import ( "github.com/hashicorp/memberlist" "golang.org/x/time/rate" + hcpconfig "github.com/hashicorp/consul/agent/hcp/config" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/connect/ca" @@ -1030,6 +1031,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { RPCBindAddr: rpcBindAddr, RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout), RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout), + RPCClientTimeout: b.durationVal("limits.rpc_client_timeout", c.Limits.RPCClientTimeout), RPCMaxBurst: intVal(c.Limits.RPCMaxBurst), RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient), RPCProtocol: intVal(c.RPCProtocol), diff --git a/agent/config/config.go b/agent/config/config.go index b10b2c7c28..9aabab82b7 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -716,6 +716,7 @@ type UnixSocket struct { type Limits struct { HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"` HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"` + RPCClientTimeout *string `mapstructure:"rpc_client_timeout"` RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"` RPCMaxBurst *int `mapstructure:"rpc_max_burst"` RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"` diff --git a/agent/config/default.go b/agent/config/default.go index ad2b3a66ff..62f39f6cc9 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -98,6 +98,7 @@ func DefaultSource() Source { http_max_conns_per_client = 200 https_handshake_timeout = "5s" rpc_handshake_timeout = "5s" + rpc_client_timeout = "60s" rpc_rate = -1 rpc_max_burst = 1000 rpc_max_conns_per_client = 100 diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 2076ac0dec..5388f3d7c6 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -133,7 +133,7 @@ type RuntimeConfig struct { // AutopilotMinQuorum sets the minimum number of servers required in a cluster // before autopilot can prune dead servers. // - //hcl: autopilot { min_quorum = int } + // hcl: autopilot { min_quorum = int } AutopilotMinQuorum uint // AutopilotRedundancyZoneTag is the Meta tag to use for separating servers @@ -908,6 +908,18 @@ type RuntimeConfig struct { // hcl: performance { rpc_hold_timeout = "duration" } RPCHoldTimeout time.Duration + // RPCClientTimeout limits how long a client is allowed to read from an RPC + // connection. This is used to set an upper bound for requests to eventually + // terminate so that RPC connections are not held indefinitely. + // It may be set to 0 explicitly to disable the timeout but this should never + // be used in production. Default is 60 seconds. + // + // Note: Blocking queries use MaxQueryTime and DefaultQueryTime to calculate + // timeouts. + // + // hcl: limits { rpc_client_timeout = "duration" } + RPCClientTimeout time.Duration + // RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed // to happen. In any large enough time interval, rate limiter limits the // rate to RPCRateLimit tokens per second, with a maximum burst size of @@ -1345,7 +1357,7 @@ type RuntimeConfig struct { SkipLeaveOnInt bool // AutoReloadConfig indicate if the config will be - //auto reloaded bases on config file modification + // auto reloaded bases on config file modification // hcl: auto_reload_config = (true|false) AutoReloadConfig bool diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 5970c3863f..3228dea0e7 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -19,9 +19,10 @@ import ( "github.com/armon/go-metrics/prometheus" "github.com/google/go-cmp/cmp/cmpopts" - hcpconfig "github.com/hashicorp/consul/agent/hcp/config" "github.com/stretchr/testify/require" + hcpconfig "github.com/hashicorp/consul/agent/hcp/config" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" @@ -4577,6 +4578,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { // defaults are changed from these values forcing that change to be // intentional. rt.RPCHandshakeTimeout = 5 * time.Second + rt.RPCClientTimeout = 60 * time.Second rt.HTTPSHandshakeTimeout = 5 * time.Second rt.HTTPMaxConnsPerClient = 200 rt.RPCMaxConnsPerClient = 100 @@ -6115,6 +6117,7 @@ func TestLoad_FullConfig(t *testing.T) { RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"), RPCBindAddr: tcpAddr("16.99.34.17:3757"), RPCHandshakeTimeout: 1932 * time.Millisecond, + RPCClientTimeout: 62 * time.Second, RPCHoldTimeout: 15707 * time.Second, RPCProtocol: 30793, RPCRateLimit: 12029.43, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 562df97a03..a4c35e4b97 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -265,6 +265,7 @@ "RPCMaxConnsPerClient": 0, "RPCProtocol": 0, "RPCRateLimit": 0, + "RPCClientTimeout": "0s", "RaftBoltDBConfig": { "NoFreelistSync": false }, diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index 76cc7a19e3..1e38cd48ed 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -300,6 +300,7 @@ limits { http_max_conns_per_client = 100 https_handshake_timeout = "2391ms" rpc_handshake_timeout = "1932ms" + rpc_client_timeout = "62s" rpc_rate = 12029.43 rpc_max_burst = 44848 rpc_max_conns_per_client = 2954 diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index 8d8e5596f1..507aef157d 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -300,6 +300,7 @@ "http_max_conns_per_client": 100, "https_handshake_timeout": "2391ms", "rpc_handshake_timeout": "1932ms", + "rpc_client_timeout": "62s", "rpc_rate": 12029.43, "rpc_max_burst": 44848, "rpc_max_conns_per_client": 2954, diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index daa22c90c1..c1d1dbe983 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1690,7 +1690,6 @@ func TestCatalog_ListServices_Stale(t *testing.T) { c.PrimaryDatacenter = "dc1" // Enable ACLs! c.ACLsEnabled = true c.Bootstrap = false // Disable bootstrap - c.RPCHoldTimeout = 10 * time.Millisecond }) defer os.RemoveAll(dir2) defer s2.Shutdown() diff --git a/agent/consul/client.go b/agent/consul/client.go index 7ce00af333..a2e5bb57e1 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -397,12 +397,12 @@ func (c *Client) Stats() map[string]map[string]string { // GetLANCoordinate returns the coordinate of the node in the LAN gossip // pool. // -// - Clients return a single coordinate for the single gossip pool they are -// in (default, segment, or partition). +// - Clients return a single coordinate for the single gossip pool they are +// in (default, segment, or partition). // -// - Servers return one coordinate for their canonical gossip pool (i.e. -// default partition/segment) and one per segment they are also ancillary -// members of. +// - Servers return one coordinate for their canonical gossip pool (i.e. +// default partition/segment) and one per segment they are also ancillary +// members of. // // NOTE: servers do not emit coordinates for partitioned gossip pools they // are ancillary members of. @@ -422,6 +422,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { // relevant configuration information func (c *Client) ReloadConfig(config ReloadableConfig) error { c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) + c.connPool.SetRPCClientTimeout(config.RPCClientTimeout) return nil } diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index ff150d545f..530fd6b892 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -50,7 +50,6 @@ func testClientConfig(t *testing.T) (string, *Config) { config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond - config.RPCHoldTimeout = 10 * time.Second return dir, config } @@ -531,11 +530,10 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { MaxStreams: 4, TLSConfigurator: tls, Datacenter: c.Datacenter, - Timeout: c.RPCHoldTimeout, DefaultQueryTime: c.DefaultQueryTime, MaxQueryTime: c.MaxQueryTime, } - + connPool.SetRPCClientTimeout(c.RPCClientTimeout) return Deps{ EventPublisher: stream.NewEventPublisher(10 * time.Second), Logger: logger, @@ -882,7 +880,7 @@ func TestClient_RPC_Timeout(t *testing.T) { _, c1 := testClientWithConfig(t, func(c *Config) { c.Datacenter = "dc1" c.NodeName = uniqueNodeName(t.Name()) - c.RPCHoldTimeout = 10 * time.Millisecond + c.RPCClientTimeout = 10 * time.Millisecond c.DefaultQueryTime = 100 * time.Millisecond c.MaxQueryTime = 200 * time.Millisecond }) @@ -895,34 +893,53 @@ func TestClient_RPC_Timeout(t *testing.T) { } }) - // waiter will sleep for 101ms which is 1ms more than the DefaultQueryTime - require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 101 * time.Millisecond})) + require.NoError(t, s1.RegisterEndpoint("Long", &waiter{duration: 100 * time.Millisecond})) + require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond})) - // Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms) - // so we expect the RPC call to timeout. - var out struct{} - err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out) - require.Error(t, err) - require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) { + // Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms) + // so we expect the RPC call to timeout. + var out struct{} + err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out) + require.Error(t, err) + require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + }) - // Blocking requests have a longer timeout (100ms) so this should pass since we - // add the maximum jitter which should be 16ms - out = struct{}{} - err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{ - QueryOptions: structs.QueryOptions{ - MinQueryIndex: 1, - }, - }, &out) - require.NoError(t, err) + t.Run("non-blocking query succeeds", func(t *testing.T) { + var out struct{} + require.NoError(t, c1.RPC("Short.Wait", &structs.NodeSpecificRequest{}, &out)) + }) - // We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail - out = struct{}{} - err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{ - QueryOptions: structs.QueryOptions{ - MinQueryIndex: 1, - MaxQueryTime: 20 * time.Millisecond, - }, - }, &out) - require.Error(t, err) - require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + t.Run("check that deadline does not persist across calls", func(t *testing.T) { + var out struct{} + err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out) + require.Error(t, err) + require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: 1, + }, + }, &out)) + }) + + t.Run("blocking query succeeds", func(t *testing.T) { + var out struct{} + require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: 1, + }, + }, &out)) + }) + + t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) { + var out struct{} + err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: 1, + MaxQueryTime: 20 * time.Millisecond, + }, + }, &out) + require.Error(t, err) + require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + }) } diff --git a/agent/consul/config.go b/agent/consul/config.go index 69d4fddee5..85cddbb2ad 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -331,6 +331,13 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration + // RPCClientTimeout limits how long a client is allowed to read from an RPC + // connection. This is used to set an upper bound for non-blocking queries to + // eventually terminate so that RPC connections are not held indefinitely. + // Blocking queries will use MaxQueryTime and DefaultQueryTime to calculate + // their own timeouts. + RPCClientTimeout time.Duration + // RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed // to happen. In any large enough time interval, rate limiter limits the // rate to RPCRateLimit tokens per second, with a maximum burst size of @@ -612,6 +619,7 @@ type RPCConfig struct { // ReloadableConfig is the configuration that is passed to ReloadConfig when // application config is reloaded. type ReloadableConfig struct { + RPCClientTimeout time.Duration RPCRateLimit rate.Limit RPCMaxBurst int RPCMaxConnsPerClient int diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 1e9239d92b..ff58615708 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1378,14 +1378,10 @@ func (r isReadRequest) IsRead() bool { return true } -func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { +func (r isReadRequest) HasTimedOut(_ time.Time, _, _, _ time.Duration) (bool, error) { return false, nil } -func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { - return time.Duration(-1) -} - func TestRPC_AuthorizeRaftRPC(t *testing.T) { caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"}) require.NoError(t, err) diff --git a/agent/consul/server.go b/agent/consul/server.go index 0972bab8da..9a1363c7ab 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -18,8 +18,8 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/agent/hcp" - connlimit "github.com/hashicorp/go-connlimit" + "github.com/hashicorp/consul-net-rpc/net/rpc" + "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" @@ -31,8 +31,6 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" - "github.com/hashicorp/consul-net-rpc/net/rpc" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" @@ -49,6 +47,7 @@ import ( "github.com/hashicorp/consul/agent/grpc-external/services/serverdiscovery" agentgrpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/services/subscribe" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -1646,6 +1645,7 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error { s.rpcConnLimiter.SetConfig(connlimit.Config{ MaxConnsPerClientIP: config.RPCMaxConnsPerClient, }) + s.connPool.SetRPCClientTimeout(config.RPCClientTimeout) if s.IsLeader() { // only bootstrap the config entries if we are the leader diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index c9b8a93247..faf1330705 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -16,9 +16,8 @@ import ( "github.com/armon/go-metrics" "github.com/google/tcpproxy" - "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/go-hclog" - uuid "github.com/hashicorp/go-uuid" + "github.com/hashicorp/go-uuid" "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" "github.com/stretchr/testify/mock" @@ -26,6 +25,8 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" + "github.com/hashicorp/consul/agent/hcp" + "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/agent/connect" @@ -1815,6 +1816,7 @@ func TestServer_ReloadConfig(t *testing.T) { c.Build = "1.5.0" c.RPCRateLimit = 500 c.RPCMaxBurst = 5000 + c.RPCClientTimeout = 60 * time.Second // Set one raft param to be non-default in the initial config, others are // default. c.RaftConfig.TrailingLogs = 1234 @@ -1828,7 +1830,10 @@ func TestServer_ReloadConfig(t *testing.T) { require.Equal(t, rate.Limit(500), limiter.Limit()) require.Equal(t, 5000, limiter.Burst()) + require.Equal(t, 60*time.Second, s.connPool.RPCClientTimeout()) + rc := ReloadableConfig{ + RPCClientTimeout: 2 * time.Minute, RPCRateLimit: 1000, RPCMaxBurst: 10000, ConfigEntryBootstrap: []structs.ConfigEntry{entryInit}, @@ -1857,6 +1862,9 @@ func TestServer_ReloadConfig(t *testing.T) { require.Equal(t, rate.Limit(1000), limiter.Limit()) require.Equal(t, 10000, limiter.Burst()) + // Check RPC client timeout got updated + require.Equal(t, 2*time.Minute, s.connPool.RPCClientTimeout()) + // Check raft config defaults := DefaultConfig() got := s.raft.ReloadableConfig() diff --git a/agent/pool/pool.go b/agent/pool/pool.go index fa223790d2..7c4b3e2d76 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/tlsutil" ) @@ -31,7 +32,7 @@ type muxSession interface { // streamClient is used to wrap a stream with an RPC client type StreamClient struct { - stream *TimeoutConn + stream net.Conn codec rpc.ClientCodec } @@ -56,36 +57,6 @@ type Conn struct { clientLock sync.Mutex } -// TimeoutConn wraps net.Conn with a read timeout. -// When set, FirstReadTimeout only applies to the very next Read. -// DefaultTimeout is used for any other Read. -type TimeoutConn struct { - net.Conn - DefaultTimeout time.Duration - FirstReadTimeout time.Duration -} - -func (c *TimeoutConn) Read(b []byte) (int, error) { - timeout := c.DefaultTimeout - // Apply timeout to first read then zero it out - if c.FirstReadTimeout > 0 { - timeout = c.FirstReadTimeout - c.FirstReadTimeout = 0 - } - var deadline time.Time - if timeout > 0 { - deadline = time.Now().Add(timeout) - } - if err := c.Conn.SetReadDeadline(deadline); err != nil { - return 0, err - } - return c.Conn.Read(b) -} - -func (c *TimeoutConn) Write(b []byte) (int, error) { - return c.Conn.Write(b) -} - func (c *Conn) Close() error { return c.session.Close() } @@ -109,14 +80,12 @@ func (c *Conn) getClient() (*StreamClient, error) { return nil, err } - timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout} - // Create the RPC client - codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle) + codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle) // Return a new stream client sc := &StreamClient{ - stream: timeoutStream, + stream: stream, codec: codec, } return sc, nil @@ -133,7 +102,7 @@ func (c *Conn) returnClient(client *StreamClient) { // If this is a Yamux stream, shrink the internal buffers so that // we can GC the idle memory - if ys, ok := client.stream.Conn.(*yamux.Stream); ok { + if ys, ok := client.stream.(*yamux.Stream); ok { ys.Shrink() } } @@ -158,6 +127,12 @@ func (c *Conn) markForUse() { // streams allowed. If TLS settings are provided outgoing connections // use TLS. type ConnPool struct { + // clientTimeoutMs is the default timeout for client RPC requests + // in milliseconds. Stored as an atomic uint32 value to allow for + // reloading. + // TODO: once we move to go1.19, change to atomic.Uint32. + clientTimeoutMs uint32 + // SrcAddr is the source address for outgoing connections. SrcAddr *net.TCPAddr @@ -165,11 +140,9 @@ type ConnPool struct { // TODO: consider refactoring to accept a full yamux.Config instead of a logger Logger *log.Logger - // The default timeout for stream reads/writes - Timeout time.Duration - - // Used for calculating timeouts on RPC requests - MaxQueryTime time.Duration + // MaxQueryTime is used for calculating timeouts on blocking queries. + MaxQueryTime time.Duration + // DefaultQueryTime is used for calculating timeouts on blocking queries. DefaultQueryTime time.Duration // The maximum time to keep a connection open @@ -364,7 +337,7 @@ func (p *ConnPool) dial( tlsRPCType RPCType, ) (net.Conn, HalfCloser, error) { // Try to dial the conn - d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout} + d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout} conn, err := d.Dial("tcp", addr.String()) if err != nil { return nil, nil, err @@ -417,6 +390,18 @@ func (p *ConnPool) dial( return conn, hc, nil } +func (p *ConnPool) RPCClientTimeout() time.Duration { + return time.Duration(atomic.LoadUint32(&p.clientTimeoutMs)) * time.Millisecond +} + +func (p *ConnPool) SetRPCClientTimeout(timeout time.Duration) { + if timeout > time.Hour { + // Prevent unreasonably large timeouts that might overflow a uint32 + timeout = time.Hour + } + atomic.StoreUint32(&p.clientTimeoutMs, uint32(timeout.Milliseconds())) +} + // DialRPCViaMeshGateway dials the destination node and sets up the connection // to be the correct RPC type using ALPN. This currently is exclusively used to // dial other servers in foreign datacenters via mesh gateways. @@ -620,6 +605,17 @@ func (p *ConnPool) rpcInsecure(dc string, addr net.Addr, method string, args int return nil } +// BlockableQuery represents a read query which can be blocking or non-blocking. +// This interface is used to override the rpc_client_timeout for blocking queries. +type BlockableQuery interface { + // BlockingTimeout returns duration > 0 if the query is blocking. + // Otherwise returns 0 for non-blocking queries. + BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration +} + +var _ BlockableQuery = (*structs.QueryOptions)(nil) +var _ BlockableQuery = (*pbcommon.QueryOptions)(nil) + func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error { p.once.Do(p.init) @@ -629,9 +625,20 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, return fmt.Errorf("rpc error getting client: %w", err) } - // Use the zero value if the request doesn't implement RPCInfo - if info, ok := args.(structs.RPCInfo); ok { - sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime) + var deadline time.Time + timeout := p.RPCClientTimeout() + if bq, ok := args.(BlockableQuery); ok { + blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime) + if blockingTimeout > 0 { + // override the default client timeout + timeout = blockingTimeout + } + } + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + if err := sc.stream.SetReadDeadline(deadline); err != nil { + return fmt.Errorf("rpc error setting read deadline: %w", err) } // Make the RPC call diff --git a/agent/setup.go b/agent/setup.go index 7d4a648cf5..22b1889380 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -8,7 +8,6 @@ import ( "time" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -24,6 +23,7 @@ import ( grpcInt "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/resolver" grpcWare "github.com/hashicorp/consul/agent/grpc-middleware" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -181,10 +181,10 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), TLSConfigurator: tls, Datacenter: config.Datacenter, - Timeout: config.RPCHoldTimeout, MaxQueryTime: config.MaxQueryTime, DefaultQueryTime: config.DefaultQueryTime, } + pool.SetRPCClientTimeout(config.RPCClientTimeout) if config.ServerMode { pool.MaxTime = 2 * time.Minute pool.MaxStreams = 64 diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 2764fa54d2..f8bd523661 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -245,7 +245,6 @@ type RPCInfo interface { TokenSecret() string SetTokenSecret(string) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) - Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration } // QueryOptions is used to specify various flags for read queries @@ -344,7 +343,8 @@ func (q *QueryOptions) SetTokenSecret(s string) { q.Token = s } -func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { +// BlockingTimeout implements pool.BlockableQuery +func (q QueryOptions) BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration { // Match logic in Server.blockingQuery. if q.MinQueryIndex > 0 { if q.MaxQueryTime > maxQueryTime { @@ -355,13 +355,15 @@ func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime tim // Timeout after maximum jitter has elapsed. q.MaxQueryTime += q.MaxQueryTime / JitterFraction - return q.MaxQueryTime + rpcHoldTimeout + return q.MaxQueryTime } - return rpcHoldTimeout + return 0 } func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil + // In addition to BlockingTimeout, allow for an additional rpcHoldTimeout buffer + // in case we need to wait for a leader election. + return time.Since(start) > rpcHoldTimeout+q.BlockingTimeout(maxQueryTime, defaultQueryTime), nil } type WriteRequest struct { @@ -387,12 +389,8 @@ func (w *WriteRequest) SetTokenSecret(s string) { w.Token = s } -func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil -} - -func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { - return rpcHoldTimeout +func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { + return time.Since(start) > rpcHoldTimeout, nil } type QueryBackend int diff --git a/internal/tools/proto-gen-rpc-glue/main.go b/internal/tools/proto-gen-rpc-glue/main.go index 22e1f89807..a2df2ab896 100644 --- a/internal/tools/proto-gen-rpc-glue/main.go +++ b/internal/tools/proto-gen-rpc-glue/main.go @@ -385,14 +385,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b) } -// Timeout implements structs.RPCInfo -func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { - if msg == nil || msg.%[2]s == nil { - return 0 - } - return msg.%[2]s.Timeout(rpcHoldTimeout, a, b) -} - // IsRead implements structs.RPCInfo func (msg *%[1]s) IsRead() bool { return false @@ -441,14 +433,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b) } -// Timeout implements structs.RPCInfo -func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { - if msg == nil || msg.%[2]s == nil { - return 0 - } - return msg.%[2]s.Timeout(rpcHoldTimeout, a, b) -} - // SetTokenSecret implements structs.RPCInfo func (msg *%[1]s) SetTokenSecret(s string) { // TODO: initialize if nil @@ -494,12 +478,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t return time.Since(start) > rpcHoldTimeout, nil } -// Timeout implements structs.RPCInfo -func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { - // TODO(peering): figure out read semantics here - return rpcHoldTimeout -} - // SetTokenSecret implements structs.RPCInfo func (msg *%[1]s) SetTokenSecret(s string) { // TODO(peering): figure out read semantics here diff --git a/proto/pbautoconf/auto_config.go b/proto/pbautoconf/auto_config.go index 74a7cf4ab6..6c14c296f2 100644 --- a/proto/pbautoconf/auto_config.go +++ b/proto/pbautoconf/auto_config.go @@ -22,10 +22,6 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) { req.ConsulToken = token } -func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil -} - -func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { - return rpcHoldTimeout +func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { + return time.Since(start) > rpcHoldTimeout, nil } diff --git a/proto/pbcommon/common.go b/proto/pbcommon/common.go index faca038b6f..ea7c7399b4 100644 --- a/proto/pbcommon/common.go +++ b/proto/pbcommon/common.go @@ -75,16 +75,19 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) { } func (q *QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil + // In addition to BlockingTimeout, allow for an additional rpcHoldTimeout buffer + // in case we need to wait for a leader election. + return time.Since(start) > rpcHoldTimeout+q.BlockingTimeout(maxQueryTime, defaultQueryTime), nil } -func (q *QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { +// BlockingTimeout implements pool.BlockableQuery +func (q *QueryOptions) BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration { maxTime := structs.DurationFromProto(q.MaxQueryTime) o := structs.QueryOptions{ MaxQueryTime: maxTime, MinQueryIndex: q.MinQueryIndex, } - return o.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime) + return o.BlockingTimeout(maxQueryTime, defaultQueryTime) } // SetFilter is needed to implement the structs.QueryOptionsCompat interface @@ -118,12 +121,7 @@ func (w *WriteRequest) AllowStaleRead() bool { // HasTimedOut implements structs.RPCInfo func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil -} - -// Timeout implements structs.RPCInfo -func (w *WriteRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration { - return rpcHoldTimeout + return time.Since(start) > rpcHoldTimeout, nil } // IsRead implements structs.RPCInfo @@ -148,13 +146,8 @@ func (r *ReadRequest) SetTokenSecret(token string) { } // HasTimedOut implements structs.RPCInfo -func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > r.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil -} - -// Timeout implements structs.RPCInfo -func (r *ReadRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration { - return rpcHoldTimeout +func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { + return time.Since(start) > rpcHoldTimeout, nil } // RequestDatacenter implements structs.RPCInfo diff --git a/proto/pbpeering/peering.rpcglue.pb.go b/proto/pbpeering/peering.rpcglue.pb.go index ca7498e0ba..1e6f94ce5d 100644 --- a/proto/pbpeering/peering.rpcglue.pb.go +++ b/proto/pbpeering/peering.rpcglue.pb.go @@ -33,12 +33,6 @@ func (msg *PeeringReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout time. return time.Since(start) > rpcHoldTimeout, nil } -// Timeout implements structs.RPCInfo -func (msg *PeeringReadRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { - // TODO(peering): figure out read semantics here - return rpcHoldTimeout -} - // SetTokenSecret implements structs.RPCInfo func (msg *PeeringReadRequest) SetTokenSecret(s string) { // TODO(peering): figure out read semantics here @@ -77,12 +71,6 @@ func (msg *PeeringListRequest) HasTimedOut(start time.Time, rpcHoldTimeout time. return time.Since(start) > rpcHoldTimeout, nil } -// Timeout implements structs.RPCInfo -func (msg *PeeringListRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { - // TODO(peering): figure out read semantics here - return rpcHoldTimeout -} - // SetTokenSecret implements structs.RPCInfo func (msg *PeeringListRequest) SetTokenSecret(s string) { // TODO(peering): figure out read semantics here diff --git a/proto/pbsubscribe/subscribe.go b/proto/pbsubscribe/subscribe.go index c8c1f44602..0b2b21bcff 100644 --- a/proto/pbsubscribe/subscribe.go +++ b/proto/pbsubscribe/subscribe.go @@ -32,13 +32,8 @@ func (req *SubscribeRequest) SetTokenSecret(token string) { } // HasTimedOut implements structs.RPCInfo -func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil -} - -// Timeout implements structs.RPCInfo -func (req *SubscribeRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { - return rpcHoldTimeout +func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { + return time.Since(start) > rpcHoldTimeout, nil } // EnterpriseMeta returns the EnterpriseMeta encoded in the request's Subject. diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index 92c34cc967..20d49f7c9e 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -541,6 +541,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `http_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single client IP address is allowed to open to the agent's HTTP(S) server. This affects the HTTP(S) servers in both client and server agents. Default value is `200`. - `https_handshake_timeout` - Configures the limit for how long the HTTPS server in both client and server agents will wait for a client to complete a TLS handshake. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). Default value is `5s`. - `rpc_handshake_timeout` - Configures the limit for how long servers will wait after a client TCP connection is established before they complete the connection handshake. When TLS is used, the same timeout applies to the TLS handshake separately from the initial protocol negotiation. All Consul clients should perform this immediately on establishing a new connection. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). When `verify_incoming` is true on servers, this limits how long the connection socket and associated goroutines will be held open before the client successfully authenticates. Default value is `5s`. + - `rpc_client_timeout` - Configures the limit for how long a client is allowed to read from an RPC connection. This is used to set an upper bound for calls to eventually terminate so that RPC connections are not held indefinitely. Blocking queries can override this timeout. Default is `60s`. - `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though. - `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting. - `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate. @@ -610,7 +611,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." backwards-compatibility, but TLS support is deprecated and will be removed in a future release. Refer to `grpc_tls` for more information on configuring a TLS-enabled port. - `grpc_tls` ((#grpc_tls_port)) - The gRPC API with TLS connections, -1 to disable. Default -1 (disabled). - **We recommend using `8502` for `grpc_tls`** as your conventional gRPC port number, as it allows some + **We recommend using `8502` for `grpc_tls`** as your conventional gRPC port number, as it allows some tools to work automatically. `grpc_tls` is always guaranteed to be encrypted. Both `grpc` and `grpc_tls` can be configured at the same time, but they may not utilize the same port number. If both `grpc` and `grpc_tls` are defined, then `grpc` will always be plaintext. This field was added in Consul 1.14.