mirror of https://github.com/status-im/consul.git
Refactor client RPC timeouts (#14965)
Fix an issue where rpc_hold_timeout was being used as the timeout for non-blocking queries. Users should be able to tune read timeouts without fiddling with rpc_hold_timeout. A new configuration `rpc_read_timeout` is created. Refactor some implementation from the original PR 11500 to remove the misleading linkage between RPCInfo's timeout (used to retry in case of certain modes of failures) and the client RPC timeouts.
This commit is contained in:
parent
0cca4c088d
commit
29a297d3e9
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
agent: Added a new config option `rpc_client_timeout` to tune timeouts for client RPC requests
|
||||
```
|
|
@ -1407,6 +1407,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
|
|||
// RPC-related performance configs. We allow explicit zero value to disable so
|
||||
// copy it whatever the value.
|
||||
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
|
||||
cfg.RPCClientTimeout = runtimeCfg.RPCClientTimeout
|
||||
|
||||
cfg.RPCConfig = runtimeCfg.RPCConfig
|
||||
|
||||
|
@ -4142,6 +4143,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
|||
}
|
||||
|
||||
cc := consul.ReloadableConfig{
|
||||
RPCClientTimeout: newCfg.RPCClientTimeout,
|
||||
RPCRateLimit: newCfg.RPCRateLimit,
|
||||
RPCMaxBurst: newCfg.RPCMaxBurst,
|
||||
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
|
||||
|
|
|
@ -4198,6 +4198,36 @@ func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) {
|
|||
require.True(t, a.consulConfig().AutoEncryptAllowTLS)
|
||||
}
|
||||
|
||||
func TestAgent_ReloadConfigRPCClientConfig(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
|
||||
hcl := `
|
||||
data_dir = "` + dataDir + `"
|
||||
server = false
|
||||
bootstrap = false
|
||||
`
|
||||
a := NewTestAgent(t, hcl)
|
||||
|
||||
defaultRPCTimeout := 60 * time.Second
|
||||
require.Equal(t, defaultRPCTimeout, a.baseDeps.ConnPool.RPCClientTimeout())
|
||||
|
||||
hcl = `
|
||||
data_dir = "` + dataDir + `"
|
||||
server = false
|
||||
bootstrap = false
|
||||
limits {
|
||||
rpc_client_timeout = "2m"
|
||||
}
|
||||
`
|
||||
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
|
||||
require.NoError(t, a.reloadConfigInternal(c))
|
||||
|
||||
require.Equal(t, 2*time.Minute, a.baseDeps.ConnPool.RPCClientTimeout())
|
||||
}
|
||||
|
||||
func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
@ -27,6 +26,8 @@ import (
|
|||
"github.com/hashicorp/memberlist"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
|
@ -1030,6 +1031,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
|||
RPCBindAddr: rpcBindAddr,
|
||||
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
|
||||
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
|
||||
RPCClientTimeout: b.durationVal("limits.rpc_client_timeout", c.Limits.RPCClientTimeout),
|
||||
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
|
||||
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
|
||||
RPCProtocol: intVal(c.RPCProtocol),
|
||||
|
|
|
@ -716,6 +716,7 @@ type UnixSocket struct {
|
|||
type Limits struct {
|
||||
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
|
||||
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
|
||||
RPCClientTimeout *string `mapstructure:"rpc_client_timeout"`
|
||||
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
|
||||
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
|
||||
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`
|
||||
|
|
|
@ -98,6 +98,7 @@ func DefaultSource() Source {
|
|||
http_max_conns_per_client = 200
|
||||
https_handshake_timeout = "5s"
|
||||
rpc_handshake_timeout = "5s"
|
||||
rpc_client_timeout = "60s"
|
||||
rpc_rate = -1
|
||||
rpc_max_burst = 1000
|
||||
rpc_max_conns_per_client = 100
|
||||
|
|
|
@ -133,7 +133,7 @@ type RuntimeConfig struct {
|
|||
// AutopilotMinQuorum sets the minimum number of servers required in a cluster
|
||||
// before autopilot can prune dead servers.
|
||||
//
|
||||
//hcl: autopilot { min_quorum = int }
|
||||
// hcl: autopilot { min_quorum = int }
|
||||
AutopilotMinQuorum uint
|
||||
|
||||
// AutopilotRedundancyZoneTag is the Meta tag to use for separating servers
|
||||
|
@ -908,6 +908,18 @@ type RuntimeConfig struct {
|
|||
// hcl: performance { rpc_hold_timeout = "duration" }
|
||||
RPCHoldTimeout time.Duration
|
||||
|
||||
// RPCClientTimeout limits how long a client is allowed to read from an RPC
|
||||
// connection. This is used to set an upper bound for requests to eventually
|
||||
// terminate so that RPC connections are not held indefinitely.
|
||||
// It may be set to 0 explicitly to disable the timeout but this should never
|
||||
// be used in production. Default is 60 seconds.
|
||||
//
|
||||
// Note: Blocking queries use MaxQueryTime and DefaultQueryTime to calculate
|
||||
// timeouts.
|
||||
//
|
||||
// hcl: limits { rpc_client_timeout = "duration" }
|
||||
RPCClientTimeout time.Duration
|
||||
|
||||
// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
|
||||
// to happen. In any large enough time interval, rate limiter limits the
|
||||
// rate to RPCRateLimit tokens per second, with a maximum burst size of
|
||||
|
@ -1345,7 +1357,7 @@ type RuntimeConfig struct {
|
|||
SkipLeaveOnInt bool
|
||||
|
||||
// AutoReloadConfig indicate if the config will be
|
||||
//auto reloaded bases on config file modification
|
||||
// auto reloaded bases on config file modification
|
||||
// hcl: auto_reload_config = (true|false)
|
||||
AutoReloadConfig bool
|
||||
|
||||
|
|
|
@ -19,9 +19,10 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
|
@ -4577,6 +4578,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
// defaults are changed from these values forcing that change to be
|
||||
// intentional.
|
||||
rt.RPCHandshakeTimeout = 5 * time.Second
|
||||
rt.RPCClientTimeout = 60 * time.Second
|
||||
rt.HTTPSHandshakeTimeout = 5 * time.Second
|
||||
rt.HTTPMaxConnsPerClient = 200
|
||||
rt.RPCMaxConnsPerClient = 100
|
||||
|
@ -6115,6 +6117,7 @@ func TestLoad_FullConfig(t *testing.T) {
|
|||
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
|
||||
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
|
||||
RPCHandshakeTimeout: 1932 * time.Millisecond,
|
||||
RPCClientTimeout: 62 * time.Second,
|
||||
RPCHoldTimeout: 15707 * time.Second,
|
||||
RPCProtocol: 30793,
|
||||
RPCRateLimit: 12029.43,
|
||||
|
|
|
@ -265,6 +265,7 @@
|
|||
"RPCMaxConnsPerClient": 0,
|
||||
"RPCProtocol": 0,
|
||||
"RPCRateLimit": 0,
|
||||
"RPCClientTimeout": "0s",
|
||||
"RaftBoltDBConfig": {
|
||||
"NoFreelistSync": false
|
||||
},
|
||||
|
|
|
@ -300,6 +300,7 @@ limits {
|
|||
http_max_conns_per_client = 100
|
||||
https_handshake_timeout = "2391ms"
|
||||
rpc_handshake_timeout = "1932ms"
|
||||
rpc_client_timeout = "62s"
|
||||
rpc_rate = 12029.43
|
||||
rpc_max_burst = 44848
|
||||
rpc_max_conns_per_client = 2954
|
||||
|
|
|
@ -300,6 +300,7 @@
|
|||
"http_max_conns_per_client": 100,
|
||||
"https_handshake_timeout": "2391ms",
|
||||
"rpc_handshake_timeout": "1932ms",
|
||||
"rpc_client_timeout": "62s",
|
||||
"rpc_rate": 12029.43,
|
||||
"rpc_max_burst": 44848,
|
||||
"rpc_max_conns_per_client": 2954,
|
||||
|
|
|
@ -1690,7 +1690,6 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
c.PrimaryDatacenter = "dc1" // Enable ACLs!
|
||||
c.ACLsEnabled = true
|
||||
c.Bootstrap = false // Disable bootstrap
|
||||
c.RPCHoldTimeout = 10 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
|
|
@ -397,12 +397,12 @@ func (c *Client) Stats() map[string]map[string]string {
|
|||
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
|
||||
// pool.
|
||||
//
|
||||
// - Clients return a single coordinate for the single gossip pool they are
|
||||
// in (default, segment, or partition).
|
||||
// - Clients return a single coordinate for the single gossip pool they are
|
||||
// in (default, segment, or partition).
|
||||
//
|
||||
// - Servers return one coordinate for their canonical gossip pool (i.e.
|
||||
// default partition/segment) and one per segment they are also ancillary
|
||||
// members of.
|
||||
// - Servers return one coordinate for their canonical gossip pool (i.e.
|
||||
// default partition/segment) and one per segment they are also ancillary
|
||||
// members of.
|
||||
//
|
||||
// NOTE: servers do not emit coordinates for partitioned gossip pools they
|
||||
// are ancillary members of.
|
||||
|
@ -422,6 +422,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
|
|||
// relevant configuration information
|
||||
func (c *Client) ReloadConfig(config ReloadableConfig) error {
|
||||
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
|
||||
c.connPool.SetRPCClientTimeout(config.RPCClientTimeout)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,6 @@ func testClientConfig(t *testing.T) (string, *Config) {
|
|||
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
|
||||
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
|
||||
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
|
||||
config.RPCHoldTimeout = 10 * time.Second
|
||||
return dir, config
|
||||
}
|
||||
|
||||
|
@ -531,11 +530,10 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
|||
MaxStreams: 4,
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: c.Datacenter,
|
||||
Timeout: c.RPCHoldTimeout,
|
||||
DefaultQueryTime: c.DefaultQueryTime,
|
||||
MaxQueryTime: c.MaxQueryTime,
|
||||
}
|
||||
|
||||
connPool.SetRPCClientTimeout(c.RPCClientTimeout)
|
||||
return Deps{
|
||||
EventPublisher: stream.NewEventPublisher(10 * time.Second),
|
||||
Logger: logger,
|
||||
|
@ -882,7 +880,7 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
|||
_, c1 := testClientWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.NodeName = uniqueNodeName(t.Name())
|
||||
c.RPCHoldTimeout = 10 * time.Millisecond
|
||||
c.RPCClientTimeout = 10 * time.Millisecond
|
||||
c.DefaultQueryTime = 100 * time.Millisecond
|
||||
c.MaxQueryTime = 200 * time.Millisecond
|
||||
})
|
||||
|
@ -895,34 +893,53 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
// waiter will sleep for 101ms which is 1ms more than the DefaultQueryTime
|
||||
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 101 * time.Millisecond}))
|
||||
require.NoError(t, s1.RegisterEndpoint("Long", &waiter{duration: 100 * time.Millisecond}))
|
||||
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))
|
||||
|
||||
// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
|
||||
// so we expect the RPC call to timeout.
|
||||
var out struct{}
|
||||
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
|
||||
// Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms)
|
||||
// so we expect the RPC call to timeout.
|
||||
var out struct{}
|
||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
})
|
||||
|
||||
// Blocking requests have a longer timeout (100ms) so this should pass since we
|
||||
// add the maximum jitter which should be 16ms
|
||||
out = struct{}{}
|
||||
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
},
|
||||
}, &out)
|
||||
require.NoError(t, err)
|
||||
t.Run("non-blocking query succeeds", func(t *testing.T) {
|
||||
var out struct{}
|
||||
require.NoError(t, c1.RPC("Short.Wait", &structs.NodeSpecificRequest{}, &out))
|
||||
})
|
||||
|
||||
// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
|
||||
out = struct{}{}
|
||||
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
MaxQueryTime: 20 * time.Millisecond,
|
||||
},
|
||||
}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
t.Run("check that deadline does not persist across calls", func(t *testing.T) {
|
||||
var out struct{}
|
||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
},
|
||||
}, &out))
|
||||
})
|
||||
|
||||
t.Run("blocking query succeeds", func(t *testing.T) {
|
||||
var out struct{}
|
||||
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
},
|
||||
}, &out))
|
||||
})
|
||||
|
||||
t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
|
||||
var out struct{}
|
||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
MaxQueryTime: 20 * time.Millisecond,
|
||||
},
|
||||
}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
})
|
||||
}
|
||||
|
|
|
@ -331,6 +331,13 @@ type Config struct {
|
|||
// place, and a small jitter is applied to avoid a thundering herd.
|
||||
RPCHoldTimeout time.Duration
|
||||
|
||||
// RPCClientTimeout limits how long a client is allowed to read from an RPC
|
||||
// connection. This is used to set an upper bound for non-blocking queries to
|
||||
// eventually terminate so that RPC connections are not held indefinitely.
|
||||
// Blocking queries will use MaxQueryTime and DefaultQueryTime to calculate
|
||||
// their own timeouts.
|
||||
RPCClientTimeout time.Duration
|
||||
|
||||
// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
|
||||
// to happen. In any large enough time interval, rate limiter limits the
|
||||
// rate to RPCRateLimit tokens per second, with a maximum burst size of
|
||||
|
@ -612,6 +619,7 @@ type RPCConfig struct {
|
|||
// ReloadableConfig is the configuration that is passed to ReloadConfig when
|
||||
// application config is reloaded.
|
||||
type ReloadableConfig struct {
|
||||
RPCClientTimeout time.Duration
|
||||
RPCRateLimit rate.Limit
|
||||
RPCMaxBurst int
|
||||
RPCMaxConnsPerClient int
|
||||
|
|
|
@ -1378,14 +1378,10 @@ func (r isReadRequest) IsRead() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
func (r isReadRequest) HasTimedOut(_ time.Time, _, _, _ time.Duration) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return time.Duration(-1)
|
||||
}
|
||||
|
||||
func TestRPC_AuthorizeRaftRPC(t *testing.T) {
|
||||
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -18,8 +18,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-version"
|
||||
|
@ -31,8 +31,6 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
|
||||
|
@ -49,6 +47,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/grpc-external/services/serverdiscovery"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
|
@ -1646,6 +1645,7 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error {
|
|||
s.rpcConnLimiter.SetConfig(connlimit.Config{
|
||||
MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
|
||||
})
|
||||
s.connPool.SetRPCClientTimeout(config.RPCClientTimeout)
|
||||
|
||||
if s.IsLeader() {
|
||||
// only bootstrap the config entries if we are the leader
|
||||
|
|
|
@ -16,9 +16,8 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/google/tcpproxy"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -26,6 +25,8 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
@ -1815,6 +1816,7 @@ func TestServer_ReloadConfig(t *testing.T) {
|
|||
c.Build = "1.5.0"
|
||||
c.RPCRateLimit = 500
|
||||
c.RPCMaxBurst = 5000
|
||||
c.RPCClientTimeout = 60 * time.Second
|
||||
// Set one raft param to be non-default in the initial config, others are
|
||||
// default.
|
||||
c.RaftConfig.TrailingLogs = 1234
|
||||
|
@ -1828,7 +1830,10 @@ func TestServer_ReloadConfig(t *testing.T) {
|
|||
require.Equal(t, rate.Limit(500), limiter.Limit())
|
||||
require.Equal(t, 5000, limiter.Burst())
|
||||
|
||||
require.Equal(t, 60*time.Second, s.connPool.RPCClientTimeout())
|
||||
|
||||
rc := ReloadableConfig{
|
||||
RPCClientTimeout: 2 * time.Minute,
|
||||
RPCRateLimit: 1000,
|
||||
RPCMaxBurst: 10000,
|
||||
ConfigEntryBootstrap: []structs.ConfigEntry{entryInit},
|
||||
|
@ -1857,6 +1862,9 @@ func TestServer_ReloadConfig(t *testing.T) {
|
|||
require.Equal(t, rate.Limit(1000), limiter.Limit())
|
||||
require.Equal(t, 10000, limiter.Burst())
|
||||
|
||||
// Check RPC client timeout got updated
|
||||
require.Equal(t, 2*time.Minute, s.connPool.RPCClientTimeout())
|
||||
|
||||
// Check raft config
|
||||
defaults := DefaultConfig()
|
||||
got := s.raft.ReloadableConfig()
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
|
@ -31,7 +32,7 @@ type muxSession interface {
|
|||
|
||||
// streamClient is used to wrap a stream with an RPC client
|
||||
type StreamClient struct {
|
||||
stream *TimeoutConn
|
||||
stream net.Conn
|
||||
codec rpc.ClientCodec
|
||||
}
|
||||
|
||||
|
@ -56,36 +57,6 @@ type Conn struct {
|
|||
clientLock sync.Mutex
|
||||
}
|
||||
|
||||
// TimeoutConn wraps net.Conn with a read timeout.
|
||||
// When set, FirstReadTimeout only applies to the very next Read.
|
||||
// DefaultTimeout is used for any other Read.
|
||||
type TimeoutConn struct {
|
||||
net.Conn
|
||||
DefaultTimeout time.Duration
|
||||
FirstReadTimeout time.Duration
|
||||
}
|
||||
|
||||
func (c *TimeoutConn) Read(b []byte) (int, error) {
|
||||
timeout := c.DefaultTimeout
|
||||
// Apply timeout to first read then zero it out
|
||||
if c.FirstReadTimeout > 0 {
|
||||
timeout = c.FirstReadTimeout
|
||||
c.FirstReadTimeout = 0
|
||||
}
|
||||
var deadline time.Time
|
||||
if timeout > 0 {
|
||||
deadline = time.Now().Add(timeout)
|
||||
}
|
||||
if err := c.Conn.SetReadDeadline(deadline); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return c.Conn.Read(b)
|
||||
}
|
||||
|
||||
func (c *TimeoutConn) Write(b []byte) (int, error) {
|
||||
return c.Conn.Write(b)
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
return c.session.Close()
|
||||
}
|
||||
|
@ -109,14 +80,12 @@ func (c *Conn) getClient() (*StreamClient, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}
|
||||
|
||||
// Create the RPC client
|
||||
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)
|
||||
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)
|
||||
|
||||
// Return a new stream client
|
||||
sc := &StreamClient{
|
||||
stream: timeoutStream,
|
||||
stream: stream,
|
||||
codec: codec,
|
||||
}
|
||||
return sc, nil
|
||||
|
@ -133,7 +102,7 @@ func (c *Conn) returnClient(client *StreamClient) {
|
|||
|
||||
// If this is a Yamux stream, shrink the internal buffers so that
|
||||
// we can GC the idle memory
|
||||
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
|
||||
if ys, ok := client.stream.(*yamux.Stream); ok {
|
||||
ys.Shrink()
|
||||
}
|
||||
}
|
||||
|
@ -158,6 +127,12 @@ func (c *Conn) markForUse() {
|
|||
// streams allowed. If TLS settings are provided outgoing connections
|
||||
// use TLS.
|
||||
type ConnPool struct {
|
||||
// clientTimeoutMs is the default timeout for client RPC requests
|
||||
// in milliseconds. Stored as an atomic uint32 value to allow for
|
||||
// reloading.
|
||||
// TODO: once we move to go1.19, change to atomic.Uint32.
|
||||
clientTimeoutMs uint32
|
||||
|
||||
// SrcAddr is the source address for outgoing connections.
|
||||
SrcAddr *net.TCPAddr
|
||||
|
||||
|
@ -165,11 +140,9 @@ type ConnPool struct {
|
|||
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
|
||||
Logger *log.Logger
|
||||
|
||||
// The default timeout for stream reads/writes
|
||||
Timeout time.Duration
|
||||
|
||||
// Used for calculating timeouts on RPC requests
|
||||
MaxQueryTime time.Duration
|
||||
// MaxQueryTime is used for calculating timeouts on blocking queries.
|
||||
MaxQueryTime time.Duration
|
||||
// DefaultQueryTime is used for calculating timeouts on blocking queries.
|
||||
DefaultQueryTime time.Duration
|
||||
|
||||
// The maximum time to keep a connection open
|
||||
|
@ -364,7 +337,7 @@ func (p *ConnPool) dial(
|
|||
tlsRPCType RPCType,
|
||||
) (net.Conn, HalfCloser, error) {
|
||||
// Try to dial the conn
|
||||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
|
||||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
|
||||
conn, err := d.Dial("tcp", addr.String())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -417,6 +390,18 @@ func (p *ConnPool) dial(
|
|||
return conn, hc, nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) RPCClientTimeout() time.Duration {
|
||||
return time.Duration(atomic.LoadUint32(&p.clientTimeoutMs)) * time.Millisecond
|
||||
}
|
||||
|
||||
func (p *ConnPool) SetRPCClientTimeout(timeout time.Duration) {
|
||||
if timeout > time.Hour {
|
||||
// Prevent unreasonably large timeouts that might overflow a uint32
|
||||
timeout = time.Hour
|
||||
}
|
||||
atomic.StoreUint32(&p.clientTimeoutMs, uint32(timeout.Milliseconds()))
|
||||
}
|
||||
|
||||
// DialRPCViaMeshGateway dials the destination node and sets up the connection
|
||||
// to be the correct RPC type using ALPN. This currently is exclusively used to
|
||||
// dial other servers in foreign datacenters via mesh gateways.
|
||||
|
@ -620,6 +605,17 @@ func (p *ConnPool) rpcInsecure(dc string, addr net.Addr, method string, args int
|
|||
return nil
|
||||
}
|
||||
|
||||
// BlockableQuery represents a read query which can be blocking or non-blocking.
|
||||
// This interface is used to override the rpc_client_timeout for blocking queries.
|
||||
type BlockableQuery interface {
|
||||
// BlockingTimeout returns duration > 0 if the query is blocking.
|
||||
// Otherwise returns 0 for non-blocking queries.
|
||||
BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration
|
||||
}
|
||||
|
||||
var _ BlockableQuery = (*structs.QueryOptions)(nil)
|
||||
var _ BlockableQuery = (*pbcommon.QueryOptions)(nil)
|
||||
|
||||
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
|
||||
p.once.Do(p.init)
|
||||
|
||||
|
@ -629,9 +625,20 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
|
|||
return fmt.Errorf("rpc error getting client: %w", err)
|
||||
}
|
||||
|
||||
// Use the zero value if the request doesn't implement RPCInfo
|
||||
if info, ok := args.(structs.RPCInfo); ok {
|
||||
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
|
||||
var deadline time.Time
|
||||
timeout := p.RPCClientTimeout()
|
||||
if bq, ok := args.(BlockableQuery); ok {
|
||||
blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime)
|
||||
if blockingTimeout > 0 {
|
||||
// override the default client timeout
|
||||
timeout = blockingTimeout
|
||||
}
|
||||
}
|
||||
if timeout > 0 {
|
||||
deadline = time.Now().Add(timeout)
|
||||
}
|
||||
if err := sc.stream.SetReadDeadline(deadline); err != nil {
|
||||
return fmt.Errorf("rpc error setting read deadline: %w", err)
|
||||
}
|
||||
|
||||
// Make the RPC call
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
|
@ -24,6 +23,7 @@ import (
|
|||
grpcInt "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
|
||||
grpcWare "github.com/hashicorp/consul/agent/grpc-middleware"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
|
@ -181,10 +181,10 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
|||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: config.Datacenter,
|
||||
Timeout: config.RPCHoldTimeout,
|
||||
MaxQueryTime: config.MaxQueryTime,
|
||||
DefaultQueryTime: config.DefaultQueryTime,
|
||||
}
|
||||
pool.SetRPCClientTimeout(config.RPCClientTimeout)
|
||||
if config.ServerMode {
|
||||
pool.MaxTime = 2 * time.Minute
|
||||
pool.MaxStreams = 64
|
||||
|
|
|
@ -245,7 +245,6 @@ type RPCInfo interface {
|
|||
TokenSecret() string
|
||||
SetTokenSecret(string)
|
||||
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error)
|
||||
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration
|
||||
}
|
||||
|
||||
// QueryOptions is used to specify various flags for read queries
|
||||
|
@ -344,7 +343,8 @@ func (q *QueryOptions) SetTokenSecret(s string) {
|
|||
q.Token = s
|
||||
}
|
||||
|
||||
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
// BlockingTimeout implements pool.BlockableQuery
|
||||
func (q QueryOptions) BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
// Match logic in Server.blockingQuery.
|
||||
if q.MinQueryIndex > 0 {
|
||||
if q.MaxQueryTime > maxQueryTime {
|
||||
|
@ -355,13 +355,15 @@ func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime tim
|
|||
// Timeout after maximum jitter has elapsed.
|
||||
q.MaxQueryTime += q.MaxQueryTime / JitterFraction
|
||||
|
||||
return q.MaxQueryTime + rpcHoldTimeout
|
||||
return q.MaxQueryTime
|
||||
}
|
||||
return rpcHoldTimeout
|
||||
return 0
|
||||
}
|
||||
|
||||
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
// In addition to BlockingTimeout, allow for an additional rpcHoldTimeout buffer
|
||||
// in case we need to wait for a leader election.
|
||||
return time.Since(start) > rpcHoldTimeout+q.BlockingTimeout(maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
type WriteRequest struct {
|
||||
|
@ -387,12 +389,8 @@ func (w *WriteRequest) SetTokenSecret(s string) {
|
|||
w.Token = s
|
||||
}
|
||||
|
||||
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
type QueryBackend int
|
||||
|
|
|
@ -385,14 +385,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
|
|||
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
if msg == nil || msg.%[2]s == nil {
|
||||
return 0
|
||||
}
|
||||
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// IsRead implements structs.RPCInfo
|
||||
func (msg *%[1]s) IsRead() bool {
|
||||
return false
|
||||
|
@ -441,14 +433,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
|
|||
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
if msg == nil || msg.%[2]s == nil {
|
||||
return 0
|
||||
}
|
||||
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// SetTokenSecret implements structs.RPCInfo
|
||||
func (msg *%[1]s) SetTokenSecret(s string) {
|
||||
// TODO: initialize if nil
|
||||
|
@ -494,12 +478,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
|
|||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
// TODO(peering): figure out read semantics here
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
// SetTokenSecret implements structs.RPCInfo
|
||||
func (msg *%[1]s) SetTokenSecret(s string) {
|
||||
// TODO(peering): figure out read semantics here
|
||||
|
|
|
@ -22,10 +22,6 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) {
|
|||
req.ConsulToken = token
|
||||
}
|
||||
|
||||
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
|
|
@ -75,16 +75,19 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) {
|
|||
}
|
||||
|
||||
func (q *QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
// In addition to BlockingTimeout, allow for an additional rpcHoldTimeout buffer
|
||||
// in case we need to wait for a leader election.
|
||||
return time.Since(start) > rpcHoldTimeout+q.BlockingTimeout(maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
func (q *QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
// BlockingTimeout implements pool.BlockableQuery
|
||||
func (q *QueryOptions) BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
maxTime := structs.DurationFromProto(q.MaxQueryTime)
|
||||
o := structs.QueryOptions{
|
||||
MaxQueryTime: maxTime,
|
||||
MinQueryIndex: q.MinQueryIndex,
|
||||
}
|
||||
return o.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime)
|
||||
return o.BlockingTimeout(maxQueryTime, defaultQueryTime)
|
||||
}
|
||||
|
||||
// SetFilter is needed to implement the structs.QueryOptionsCompat interface
|
||||
|
@ -118,12 +121,7 @@ func (w *WriteRequest) AllowStaleRead() bool {
|
|||
|
||||
// HasTimedOut implements structs.RPCInfo
|
||||
func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (w *WriteRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
// IsRead implements structs.RPCInfo
|
||||
|
@ -148,13 +146,8 @@ func (r *ReadRequest) SetTokenSecret(token string) {
|
|||
}
|
||||
|
||||
// HasTimedOut implements structs.RPCInfo
|
||||
func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > r.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (r *ReadRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
// RequestDatacenter implements structs.RPCInfo
|
||||
|
|
|
@ -33,12 +33,6 @@ func (msg *PeeringReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.
|
|||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *PeeringReadRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
// TODO(peering): figure out read semantics here
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
// SetTokenSecret implements structs.RPCInfo
|
||||
func (msg *PeeringReadRequest) SetTokenSecret(s string) {
|
||||
// TODO(peering): figure out read semantics here
|
||||
|
@ -77,12 +71,6 @@ func (msg *PeeringListRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.
|
|||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *PeeringListRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
// TODO(peering): figure out read semantics here
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
// SetTokenSecret implements structs.RPCInfo
|
||||
func (msg *PeeringListRequest) SetTokenSecret(s string) {
|
||||
// TODO(peering): figure out read semantics here
|
||||
|
|
|
@ -32,13 +32,8 @@ func (req *SubscribeRequest) SetTokenSecret(token string) {
|
|||
}
|
||||
|
||||
// HasTimedOut implements structs.RPCInfo
|
||||
func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (req *SubscribeRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
}
|
||||
|
||||
// EnterpriseMeta returns the EnterpriseMeta encoded in the request's Subject.
|
||||
|
|
|
@ -541,6 +541,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
|||
- `http_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single client IP address is allowed to open to the agent's HTTP(S) server. This affects the HTTP(S) servers in both client and server agents. Default value is `200`.
|
||||
- `https_handshake_timeout` - Configures the limit for how long the HTTPS server in both client and server agents will wait for a client to complete a TLS handshake. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). Default value is `5s`.
|
||||
- `rpc_handshake_timeout` - Configures the limit for how long servers will wait after a client TCP connection is established before they complete the connection handshake. When TLS is used, the same timeout applies to the TLS handshake separately from the initial protocol negotiation. All Consul clients should perform this immediately on establishing a new connection. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). When `verify_incoming` is true on servers, this limits how long the connection socket and associated goroutines will be held open before the client successfully authenticates. Default value is `5s`.
|
||||
- `rpc_client_timeout` - Configures the limit for how long a client is allowed to read from an RPC connection. This is used to set an upper bound for calls to eventually terminate so that RPC connections are not held indefinitely. Blocking queries can override this timeout. Default is `60s`.
|
||||
- `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.
|
||||
- `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting.
|
||||
- `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate.
|
||||
|
@ -610,7 +611,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
|||
backwards-compatibility, but TLS support is deprecated and will be removed in a future
|
||||
release. Refer to `grpc_tls` for more information on configuring a TLS-enabled port.
|
||||
- `grpc_tls` ((#grpc_tls_port)) - The gRPC API with TLS connections, -1 to disable. Default -1 (disabled).
|
||||
**We recommend using `8502` for `grpc_tls`** as your conventional gRPC port number, as it allows some
|
||||
**We recommend using `8502` for `grpc_tls`** as your conventional gRPC port number, as it allows some
|
||||
tools to work automatically. `grpc_tls` is always guaranteed to be encrypted. Both `grpc` and `grpc_tls`
|
||||
can be configured at the same time, but they may not utilize the same port number. If both `grpc` and
|
||||
`grpc_tls` are defined, then `grpc` will always be plaintext. This field was added in Consul 1.14.
|
||||
|
|
Loading…
Reference in New Issue