adding config for request_limits (#15531)

* server: add placeholder glue for rate limit handler

This commit adds a no-op implementation of the rate-limit handler and
adds it to the `consul.Server` struct and setup code.

This allows us to start working on the net/rpc and gRPC interceptors and
config logic.

* Add handler errors

* Set the global read and write limits

* fixing multilimiter moving packages

* Fix typo

* Simplify globalLimit usage

* add multilimiter and tests

* exporting LimitedEntity

* Apply suggestions from code review

Co-authored-by: John Murret <john.murret@hashicorp.com>

* add config update and rename config params

* add doc string and split config

* Apply suggestions from code review

Co-authored-by: Dan Upton <daniel@floppy.co>

* use timer to avoid go routine leak and change the interface

* add comments to tests

* fix failing test

* add prefix with config edge, refactor tests

* Apply suggestions from code review

Co-authored-by: Dan Upton <daniel@floppy.co>

* refactor to apply configs for limiters under a prefix

* add fuzz tests and fix bugs found. Refactor reconcile loop to have a simpler logic

* make KeyType an exported type

* split the config and limiter trees to fix race conditions in config update

* rename variables

* fix race in test and remove dead code

* fix reconcile loop to not create a timer on each loop

* add extra benchmark tests and fix tests

* fix benchmark test to pass value to func

* server: add placeholder glue for rate limit handler

This commit adds a no-op implementation of the rate-limit handler and
adds it to the `consul.Server` struct and setup code.

This allows us to start working on the net/rpc and gRPC interceptors and
config logic.

* Set the global read and write limits

* fixing multilimiter moving packages

* add server configuration for global rate limiting.

* remove agent test

* remove added stuff from handler

* remove added stuff from multilimiter

* removing unnecessary TODOs

* Removing TODO comment from handler

* adding in defaulting to infinite

* add disabled status in there

* adding in documentation for disabled mode.

* make disabled the default.

* Add mock and agent test

* addig documentation and missing mock file.

* Fixing test TestLoad_IntegrationWithFlags

* updating docs based on PR feedback.

* Updating Request Limits mode to use int based on PR feedback.

* Adding RequestLimits struct so we have a nested struct in ReloadableConfig.

* fixing linting references

* Update agent/consul/rate/handler.go

Co-authored-by: Dan Upton <daniel@floppy.co>

* Update agent/consul/config.go

Co-authored-by: Dan Upton <daniel@floppy.co>

* removing the ignore of the request limits in JSON.  addingbuilder logic to convert any read rate or write rate less than 0 to rate.Inf

* added conversion function to convert request limits object to handler config.

* Updating docs to reflect gRPC and RPC are rate limit and as a result, HTTP requests are as well.

* Updating values for TestLoad_FullConfig() so that they were different and discernable.

* Updating TestRuntimeConfig_Sanitize

* Fixing TestLoad_IntegrationWithFlags test

* putting nil check in place

* fixing rebase

* removing change for missing error checks.  will put in another PR

* Rebasing after default multilimiter config change

* resolving rebase issues

* updating reference for incomingRPCLimiter to use interface

* updating interface

* Updating interfaces

* Fixing mock reference

Co-authored-by: Daniel Upton <daniel@floppy.co>
Co-authored-by: Dhia Ayachi <dhia@hashicorp.com>
This commit is contained in:
John Murret 2022-12-13 13:09:55 -07:00 committed by GitHub
parent 233dbcb67f
commit e027c94b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 601 additions and 179 deletions

View File

@ -40,6 +40,7 @@ import (
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/servercert"
"github.com/hashicorp/consul/agent/dns"
external "github.com/hashicorp/consul/agent/grpc-external"
@ -564,7 +565,8 @@ func (a *Agent) Start(ctx context.Context) error {
}
// gRPC calls are only rate-limited on server, not client agents.
grpcRateLimiter := middleware.NullRateLimiter()
var grpcRateLimiter rpcRate.RequestLimitsHandler
grpcRateLimiter = rpcRate.NullRateLimiter()
if s, ok := a.delegate.(*consul.Server); ok {
grpcRateLimiter = s.IncomingRPCLimiter()
}
@ -1479,6 +1481,10 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.PeeringEnabled = runtimeCfg.PeeringEnabled
cfg.PeeringTestAllowPeerRegistrations = runtimeCfg.PeeringTestAllowPeerRegistrations
cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String()
cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate
cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate
enterpriseConsulConfig(cfg, runtimeCfg)
return cfg, nil
}
@ -4034,17 +4040,18 @@ func (a *Agent) reloadConfig(autoReload bool) error {
{a.config.TLS.HTTPS, newCfg.TLS.HTTPS},
} {
if f.oldCfg.KeyFile != f.newCfg.KeyFile {
err = a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile)
a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile)
if err != nil {
return err
}
}
if f.oldCfg.CertFile != f.newCfg.CertFile {
err = a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile)
a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile)
if err != nil {
return err
}
}
if revertStaticConfig(f.oldCfg, f.newCfg) {
a.logger.Warn("Changes to your configuration were detected that for security reasons cannot be automatically applied by 'auto_reload_config'. Manually reload your configuration (e.g. with 'consul reload') to apply these changes.", "StaticRuntimeConfig", f.oldCfg, "StaticRuntimeConfig From file", f.newCfg)
}
@ -4145,6 +4152,11 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
}
cc := consul.ReloadableConfig{
RequestLimits: &consul.RequestLimits{
Mode: newCfg.RequestLimitsMode,
ReadRate: newCfg.RequestLimitsReadRate,
WriteRate: newCfg.RequestLimitsWriteRate,
},
RPCClientTimeout: newCfg.RPCClientTimeout,
RPCRateLimit: newCfg.RPCRateLimit,
RPCMaxBurst: newCfg.RPCMaxBurst,

View File

@ -4243,6 +4243,28 @@ func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs)
}
func TestAgent_consulConfig_RequestLimits(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
hcl := `
limits {
request_limits {
mode = "enforcing"
read_rate = 8888
write_rate = 9999
}
}
`
a := NewTestAgent(t, hcl)
defer a.Shutdown()
require.Equal(t, "enforcing", a.consulConfig().RequestLimitsMode)
require.Equal(t, rate.Limit(8888), a.consulConfig().RequestLimitsReadRate)
require.Equal(t, rate.Limit(9999), a.consulConfig().RequestLimitsWriteRate)
}
func TestAgent_grpcInjectAddr(t *testing.T) {
tt := []struct {
name string

View File

@ -32,6 +32,7 @@ import (
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/dns"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
@ -1045,6 +1046,9 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RequestLimitsMode: b.requestsLimitsModeVal(stringVal(c.Limits.RequestLimits.Mode)),
RequestLimitsReadRate: limitVal(c.Limits.RequestLimits.ReadRate),
RequestLimitsWriteRate: limitVal(c.Limits.RequestLimits.WriteRate),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
@ -1778,6 +1782,19 @@ func (b *builder) dnsRecursorStrategyVal(v string) dns.RecursorStrategy {
return out
}
func (b *builder) requestsLimitsModeVal(v string) consulrate.Mode {
var out consulrate.Mode
mode, ok := consulrate.RequestLimitsModeFromName(v)
if !ok {
b.err = multierror.Append(b.err, fmt.Errorf("limits.request_limits.mode: invalid mode: %q", v))
} else {
out = mode
}
return out
}
func (b *builder) exposeConfVal(v *ExposeConfig) structs.ExposeConfig {
var out structs.ExposeConfig
if v == nil {
@ -1993,6 +2010,15 @@ func float64Val(v *float64) float64 {
return float64ValWithDefault(v, 0)
}
func limitVal(v *float64) rate.Limit {
f := float64Val(v)
if f < 0 {
return rate.Inf
}
return rate.Limit(f)
}
func (b *builder) cidrsVal(name string, v []string) (nets []*net.IPNet) {
if v == nil {
return

View File

@ -712,16 +712,23 @@ type UnixSocket struct {
User *string `mapstructure:"user"`
}
type RequestLimits struct {
Mode *string `mapstructure:"mode"`
ReadRate *float64 `mapstructure:"read_rate"`
WriteRate *float64 `mapstructure:"write_rate"`
}
type Limits struct {
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
RPCClientTimeout *string `mapstructure:"rpc_client_timeout"`
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`
RPCRate *float64 `mapstructure:"rpc_rate"`
KVMaxValueSize *uint64 `mapstructure:"kv_max_value_size"`
TxnMaxReqLen *uint64 `mapstructure:"txn_max_req_len"`
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
RequestLimits RequestLimits `mapstructure:"request_limits"`
RPCClientTimeout *string `mapstructure:"rpc_client_timeout"`
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`
RPCRate *float64 `mapstructure:"rpc_rate"`
KVMaxValueSize *uint64 `mapstructure:"kv_max_value_size"`
TxnMaxReqLen *uint64 `mapstructure:"txn_max_req_len"`
}
type Segment struct {

View File

@ -97,6 +97,11 @@ func DefaultSource() Source {
limits = {
http_max_conns_per_client = 200
https_handshake_timeout = "5s"
request_limits = {
mode = "disabled"
read_rate = -1
write_rate = -1
}
rpc_handshake_timeout = "5s"
rpc_client_timeout = "60s"
rpc_rate = -1

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/dns"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/structs"
@ -923,14 +924,14 @@ type RuntimeConfig struct {
// See https://en.wikipedia.org/wiki/Token_bucket for more about token
// buckets.
//
// hcl: limit { rpc_rate = (float64|MaxFloat64) rpc_max_burst = int }
// hcl: limits { rpc_rate = (float64|MaxFloat64) rpc_max_burst = int }
RPCRateLimit rate.Limit
RPCMaxBurst int
// RPCMaxConnsPerClient limits the number of concurrent TCP connections the
// RPC server will accept from any single source IP address.
//
// hcl: limits{ rpc_max_conns_per_client = 100 }
// hcl: limits { rpc_max_conns_per_client = 100 }
RPCMaxConnsPerClient int
// RPCProtocol is the Consul protocol version to use.
@ -1009,6 +1010,37 @@ type RuntimeConfig struct {
// flag: -rejoin
RejoinAfterLeave bool
// RequestLimitsMode will disable or enable rate limiting. If not disabled, it
// enforces the action that will occur when RequestLimitsReadRate
// or RequestLimitsWriteRate is exceeded. The default value of "disabled" will
// prevent any rate limiting from occuring. A value of "enforce" will block
// the request from processings by returning an error. A value of
// "permissive" will not block the request and will allow the request to
// continue processing.
//
// hcl: limits { request_limits { mode = "permissive" } }
RequestLimitsMode consulrate.Mode
// RequestLimitsReadRate controls how frequently RPC, gRPC, and HTTP
// queries are allowed to happen. In any large enough time interval, rate
// limiter limits the rate to RequestLimitsReadRate tokens per second.
//
// See https://en.wikipedia.org/wiki/Token_bucket for more about token
// buckets.
//
// hcl: limits { request_limits { read_rate = (float64|MaxFloat64) } }
RequestLimitsReadRate rate.Limit
// RequestLimitsWriteRate controls how frequently RPC, gRPC, and HTTP
// writes are allowed to happen. In any large enough time interval, rate
// limiter limits the rate to RequestLimitsWriteRate tokens per second.
//
// See https://en.wikipedia.org/wiki/Token_bucket for more about token
// buckets.
//
// hcl: limits { request_limits { write_rate = (float64|MaxFloat64) } }
RequestLimitsWriteRate rate.Limit
// RetryJoinIntervalLAN specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.

View File

@ -20,6 +20,7 @@ import (
"github.com/armon/go-metrics/prometheus"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
@ -27,6 +28,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
@ -4615,6 +4617,9 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
rt.HTTPSHandshakeTimeout = 5 * time.Second
rt.HTTPMaxConnsPerClient = 200
rt.RPCMaxConnsPerClient = 100
rt.RequestLimitsMode = consulrate.ModeDisabled
rt.RequestLimitsReadRate = rate.Inf
rt.RequestLimitsWriteRate = rate.Inf
rt.SegmentLimit = 64
rt.XDSUpdateRateLimit = 250
},
@ -6163,6 +6168,9 @@ func TestLoad_FullConfig(t *testing.T) {
RaftTrailingLogs: 83749,
ReconnectTimeoutLAN: 23739 * time.Second,
ReconnectTimeoutWAN: 26694 * time.Second,
RequestLimitsMode: consulrate.ModePermissive,
RequestLimitsReadRate: 99.0,
RequestLimitsWriteRate: 101.0,
RejoinAfterLeave: true,
RetryJoinIntervalLAN: 8067 * time.Second,
RetryJoinIntervalWAN: 28866 * time.Second,

View File

@ -278,6 +278,9 @@
"ReconnectTimeoutLAN": "0s",
"ReconnectTimeoutWAN": "0s",
"RejoinAfterLeave": false,
"RequestLimitsMode": 0,
"RequestLimitsReadRate": 0,
"RequestLimitsWriteRate": 0,
"RetryJoinIntervalLAN": "0s",
"RetryJoinIntervalWAN": "0s",
"RetryJoinLAN": [

View File

@ -305,6 +305,11 @@ limits {
rpc_max_conns_per_client = 2954
kv_max_value_size = 1234567800
txn_max_req_len = 567800000
request_limits {
mode = "permissive"
read_rate = 99.0
write_rate = 101.0
}
}
log_level = "k1zo9Spt"
log_json = true

View File

@ -9,25 +9,25 @@
"acl_replication_token": "LMmgy5dO",
"acl_token": "O1El0wan",
"acl_ttl": "18060s",
"acl" : {
"enabled" : true,
"down_policy" : "03eb2aee",
"default_policy" : "72c2e7a0",
"acl": {
"enabled": true,
"down_policy": "03eb2aee",
"default_policy": "72c2e7a0",
"enable_key_list_policy": true,
"enable_token_persistence": true,
"policy_ttl": "1123s",
"role_ttl": "9876s",
"token_ttl": "3321s",
"enable_token_replication" : true,
"enable_token_replication": true,
"msp_disable_bootstrap": true,
"tokens" : {
"master" : "8a19ac27",
"initial_management" : "3820e09a",
"agent_master" : "64fd0e08",
"agent_recovery" : "1dba6aba",
"replication" : "5795983a",
"agent" : "bed2377c",
"default" : "418fdff1",
"tokens": {
"master": "8a19ac27",
"initial_management": "3820e09a",
"agent_master": "64fd0e08",
"agent_recovery": "1dba6aba",
"replication": "5795983a",
"agent": "bed2377c",
"default": "418fdff1",
"managed_service_provider": [
{
"accessor_id": "first",
@ -57,9 +57,15 @@
"enabled": false,
"intro_token": "OpBPGRwt",
"intro_token_file": "gFvAXwI8",
"dns_sans": ["6zdaWg9J"],
"ip_sans": ["198.18.99.99"],
"server_addresses": ["198.18.100.1"],
"dns_sans": [
"6zdaWg9J"
],
"ip_sans": [
"198.18.99.99"
],
"server_addresses": [
"198.18.100.1"
],
"authorization": {
"enabled": true,
"static": {
@ -71,9 +77,15 @@
"foo": "bar"
},
"bound_issuer": "consul",
"bound_audiences": ["consul-cluster-1"],
"claim_assertions": ["value.node == \"${node}\""],
"jwt_validation_pub_keys": ["-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAERVchfCZng4mmdvQz1+sJHRN40snC\nYt8NjYOnbnScEXMkyoUmASr88gb7jaVAVt3RYASAbgBjB2Z+EUizWkx5Tg==\n-----END PUBLIC KEY-----"]
"bound_audiences": [
"consul-cluster-1"
],
"claim_assertions": [
"value.node == \"${node}\""
],
"jwt_validation_pub_keys": [
"-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAERVchfCZng4mmdvQz1+sJHRN40snC\nYt8NjYOnbnScEXMkyoUmASr88gb7jaVAVt3RYASAbgBjB2Z+EUizWkx5Tg==\n-----END PUBLIC KEY-----"
]
}
}
},
@ -82,7 +94,7 @@
"disable_upgrade_migration": true,
"last_contact_threshold": "12705s",
"max_trailing_logs": 17849,
"min_quorum": 3,
"min_quorum": 3,
"redundancy_zone_tag": "3IsufDJf",
"server_stabilization_time": "23057s",
"upgrade_version_tag": "W9pDwFAL"
@ -104,11 +116,20 @@
"service_id": "L8G0QNmR",
"token": "oo4BCTgJ",
"status": "qLykAl5u",
"args": ["f3BemRjy", "e5zgpef7"],
"args": [
"f3BemRjy",
"e5zgpef7"
],
"http": "29B93haH",
"header": {
"hBq0zn1q": [ "2a9o9ZKP", "vKwA5lR6" ],
"f3r6xFtM": [ "RyuIdDWv", "QbxEcIUM" ]
"hBq0zn1q": [
"2a9o9ZKP",
"vKwA5lR6"
],
"f3r6xFtM": [
"RyuIdDWv",
"QbxEcIUM"
]
},
"method": "Dou0nGT5",
"body": "5PBQd2OT",
@ -134,11 +155,20 @@
"service_id": "lSulPcyz",
"token": "toO59sh8",
"status": "9RlWsXMV",
"args": ["4BAJttck", "4D2NPtTQ"],
"args": [
"4BAJttck",
"4D2NPtTQ"
],
"http": "dohLcyQ2",
"header": {
"ZBfTin3L": [ "1sDbEqYG", "lJGASsWK" ],
"Ui0nU99X": [ "LMccm3Qe", "k5H5RggQ" ]
"ZBfTin3L": [
"1sDbEqYG",
"lJGASsWK"
],
"Ui0nU99X": [
"LMccm3Qe",
"k5H5RggQ"
]
},
"method": "aldrIQ4l",
"body": "wSjTy7dg",
@ -163,11 +193,20 @@
"service_id": "CmUUcRna",
"token": "a3nQzHuy",
"status": "irj26nf3",
"args": ["9s526ogY", "gSlOHj1w"],
"args": [
"9s526ogY",
"gSlOHj1w"
],
"http": "yzhgsQ7Y",
"header": {
"zcqwA8dO": [ "qb1zx0DL", "sXCxPFsD" ],
"qxvdnSE9": [ "6wBPUYdF", "YYh8wtSZ" ]
"zcqwA8dO": [
"qb1zx0DL",
"sXCxPFsD"
],
"qxvdnSE9": [
"6wBPUYdF",
"YYh8wtSZ"
]
},
"method": "gLrztrNw",
"body": "0jkKgGUC",
@ -202,15 +241,21 @@
},
"auto_encrypt": {
"tls": false,
"dns_san": ["a.com", "b.com"],
"ip_san": ["192.168.4.139", "192.168.4.140"],
"dns_san": [
"a.com",
"b.com"
],
"ip_san": [
"192.168.4.139",
"192.168.4.140"
],
"allow_tls": true
},
"cloud": {
"resource_id": "N43DsscE",
"client_id": "6WvsDZCP",
"client_secret": "lCSMHOpB",
"hostname": "DH4bh7aC",
"client_secret": "lCSMHOpB",
"hostname": "DH4bh7aC",
"auth_url": "332nCdR2",
"scada_address": "aoeusth232"
},
@ -226,21 +271,21 @@
"enable_mesh_gateway_wan_federation": false,
"enabled": true
},
"gossip_lan" : {
"gossip_lan": {
"gossip_nodes": 6,
"gossip_interval" : "25252s",
"retransmit_mult" : 1234,
"suspicion_mult" : 1235,
"probe_interval" : "101ms",
"probe_timeout" : "102ms"
"gossip_interval": "25252s",
"retransmit_mult": 1234,
"suspicion_mult": 1235,
"probe_interval": "101ms",
"probe_timeout": "102ms"
},
"gossip_wan" : {
"gossip_nodes" : 2,
"gossip_interval" : "6966s",
"retransmit_mult" : 16384,
"suspicion_mult" : 16385,
"probe_interval" : "103ms",
"probe_timeout" : "104ms"
"gossip_wan": {
"gossip_nodes": 2,
"gossip_interval": "6966s",
"retransmit_mult": 16384,
"suspicion_mult": 16385,
"probe_interval": "103ms",
"probe_timeout": "104ms"
},
"datacenter": "rzo029wg",
"default_query_time": "16743s",
@ -283,8 +328,15 @@
"encrypt_verify_incoming": true,
"encrypt_verify_outgoing": true,
"http_config": {
"block_endpoints": [ "RBvAFcGD", "fWOWFznh" ],
"allow_write_http_from": [ "127.0.0.1/8", "22.33.44.55/32", "0.0.0.0/0" ],
"block_endpoints": [
"RBvAFcGD",
"fWOWFznh"
],
"allow_write_http_from": [
"127.0.0.1/8",
"22.33.44.55/32",
"0.0.0.0/0"
],
"response_headers": {
"M6TKa9NP": "xjuxjOzQ",
"JRCrHZed": "rl0mTx81"
@ -304,7 +356,12 @@
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,
"kv_max_value_size": 1234567800,
"txn_max_req_len": 567800000
"txn_max_req_len": 567800000,
"request_limits": {
"mode": "permissive",
"read_rate": 99.0,
"write_rate": 101.0
}
},
"log_level": "k1zo9Spt",
"log_json": true,
@ -340,7 +397,10 @@
},
"protocol": 30793,
"primary_datacenter": "ejtmd43d",
"primary_gateways": [ "aej8eeZo", "roh2KahS" ],
"primary_gateways": [
"aej8eeZo",
"roh2KahS"
],
"primary_gateways_interval": "18866s",
"raft_protocol": 3,
"raft_snapshot_threshold": 16384,
@ -352,15 +412,26 @@
"read_replica": true,
"reconnect_timeout": "23739s",
"reconnect_timeout_wan": "26694s",
"recursors": [ "63.38.39.58", "92.49.18.18" ],
"recursors": [
"63.38.39.58",
"92.49.18.18"
],
"rejoin_after_leave": true,
"retry_interval": "8067s",
"retry_interval_wan": "28866s",
"retry_join": [ "pbsSFY7U", "l0qLtWij" ],
"retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ],
"retry_join": [
"pbsSFY7U",
"l0qLtWij"
],
"retry_join_wan": [
"PFsR02Ye",
"rJdQIhER"
],
"retry_max": 913,
"retry_max_wan": 23160,
"rpc": {"enable_streaming": true},
"rpc": {
"enable_streaming": true
},
"segment_limit": 123,
"serf_lan": "99.43.63.15",
"serf_wan": "67.88.33.19",
@ -382,7 +453,10 @@
"port": 6109
}
},
"tags": ["nkwshvM5", "NTDWn3ek"],
"tags": [
"nkwshvM5",
"NTDWn3ek"
],
"address": "cOlSOhbp",
"token": "msy7iWER",
"port": 24237,
@ -396,11 +470,19 @@
"name": "iehanzuq",
"status": "rCvn53TH",
"notes": "fti5lfF3",
"args": ["16WRUmwS", "QWk7j7ae"],
"args": [
"16WRUmwS",
"QWk7j7ae"
],
"http": "dl3Fgme3",
"header": {
"rjm4DEd3": ["2m3m2Fls"],
"l4HwQ112": ["fk56MNlo", "dhLK56aZ"]
"rjm4DEd3": [
"2m3m2Fls"
],
"l4HwQ112": [
"fk56MNlo",
"dhLK56aZ"
]
},
"method": "9afLm3Mj",
"body": "wVVL2V6f",
@ -424,11 +506,20 @@
"name": "sgV4F7Pk",
"notes": "yP5nKbW0",
"status": "7oLMEyfu",
"args": ["5wEZtZpv", "0Ihyk8cS"],
"args": [
"5wEZtZpv",
"0Ihyk8cS"
],
"http": "KyDjGY9H",
"header": {
"gv5qefTz": [ "5Olo2pMG", "PvvKWQU5" ],
"SHOVq1Vv": [ "jntFhyym", "GYJh32pp" ]
"gv5qefTz": [
"5Olo2pMG",
"PvvKWQU5"
],
"SHOVq1Vv": [
"jntFhyym",
"GYJh32pp"
]
},
"method": "T66MFBfR",
"body": "OwGjTFQi",
@ -451,11 +542,20 @@
"name": "IEqrzrsd",
"notes": "SVqApqeM",
"status": "XXkVoZXt",
"args": ["wD05Bvao", "rLYB7kQC"],
"args": [
"wD05Bvao",
"rLYB7kQC"
],
"http": "kyICZsn8",
"header": {
"4ebP5vL4": [ "G20SrL5Q", "DwPKlMbo" ],
"p2UI34Qz": [ "UsG1D0Qh", "NHhRiB6s" ]
"4ebP5vL4": [
"G20SrL5Q",
"DwPKlMbo"
],
"p2UI34Qz": [
"UsG1D0Qh",
"NHhRiB6s"
]
},
"method": "ciYHWors",
"body": "lUVLGYU7",
@ -482,7 +582,10 @@
{
"id": "wI1dzxS4",
"name": "7IszXMQ1",
"tags": ["0Zwg8l6v", "zebELdN5"],
"tags": [
"0Zwg8l6v",
"zebELdN5"
],
"address": "9RhqPSPB",
"token": "myjKJkWH",
"port": 72219,
@ -492,11 +595,19 @@
"name": "atDGP7n5",
"status": "pDQKEhWL",
"notes": "Yt8EDLev",
"args": ["81EDZLPa", "bPY5X8xd"],
"args": [
"81EDZLPa",
"bPY5X8xd"
],
"http": "qzHYvmJO",
"header": {
"UkpmZ3a3": ["2dfzXuxZ"],
"cVFpko4u": ["gGqdEB6k", "9LsRo22u"]
"UkpmZ3a3": [
"2dfzXuxZ"
],
"cVFpko4u": [
"gGqdEB6k",
"9LsRo22u"
]
},
"method": "X5DrovFc",
"body": "WeikigLh",
@ -521,7 +632,10 @@
{
"id": "MRHVMZuD",
"name": "6L6BVfgH",
"tags": ["7Ale4y6o", "PMBW08hy"],
"tags": [
"7Ale4y6o",
"PMBW08hy"
],
"address": "R6H6g8h0",
"token": "ZgY8gjMI",
"port": 38292,
@ -536,11 +650,20 @@
"name": "9OOS93ne",
"notes": "CQy86DH0",
"status": "P0SWDvrk",
"args": ["EXvkYIuG", "BATOyt6h"],
"args": [
"EXvkYIuG",
"BATOyt6h"
],
"http": "u97ByEiW",
"header": {
"MUlReo8L": [ "AUZG7wHG", "gsN0Dc2N" ],
"1UJXjVrT": [ "OJgxzTfk", "xZZrFsq7" ]
"MUlReo8L": [
"AUZG7wHG",
"gsN0Dc2N"
],
"1UJXjVrT": [
"OJgxzTfk",
"xZZrFsq7"
]
},
"method": "5wkAxCUE",
"body": "7CRjCJyz",
@ -621,8 +744,8 @@
"destination_namespace": "9nakw0td",
"destination_partition": "part-9nakw0td",
"destination_type": "prepared_query",
"local_bind_socket_path": "/foo/bar/upstream",
"local_bind_socket_mode": "0600"
"local_bind_socket_path": "/foo/bar/upstream",
"local_bind_socket_mode": "0600"
}
]
}
@ -634,15 +757,21 @@
"port": 27147,
"proxy": {
"config": {
"1CuJHVfw" : "Kzqsa7yc"
"1CuJHVfw": "Kzqsa7yc"
}
}
}
],
"session_ttl_min": "26627s",
"skip_leave_on_interrupt": true,
"start_join": [ "LR3hGDoG", "MwVpZ4Up" ],
"start_join_wan": [ "EbFSc3nA", "kwXTh623" ],
"start_join": [
"LR3hGDoG",
"MwVpZ4Up"
],
"start_join_wan": [
"EbFSc3nA",
"kwXTh623"
],
"syslog_facility": "hHv79Uia",
"tagged_addresses": {
"7MYgHrYH": "dALJAhLD",
@ -664,10 +793,16 @@
"circonus_submission_url": "gTcbS93G",
"disable_hostname": true,
"dogstatsd_addr": "0wSndumK",
"dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ],
"dogstatsd_tags": [
"3N81zSUB",
"Xtj8AnXZ"
],
"retry_failed_connection": true,
"filter_default": true,
"prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ],
"prefix_filter": [
"+oJotS8XJ",
"-cazlEhGn"
],
"metrics_prefix": "ftO6DySn",
"prometheus_retention_time": "15s",
"statsd_address": "drce87cy",
@ -722,7 +857,10 @@
"dir": "pVncV4Ey",
"content_path": "qp1WRhYH",
"metrics_provider": "sgnaoa_lower_case",
"metrics_provider_files": ["sgnaMFoa", "dicnwkTH"],
"metrics_provider_files": [
"sgnaMFoa",
"dicnwkTH"
],
"metrics_provider_options_json": "{\"DIbVQadX\": 1}",
"metrics_proxy": {
"base_url": "http://foo.bar",
@ -732,7 +870,10 @@
"value": "TYBgnN2F"
}
],
"path_allowlist": ["/aSh3cu", "/eiK/2Th"]
"path_allowlist": [
"/aSh3cu",
"/eiK/2Th"
]
},
"dashboard_url_templates": {
"u2eziu2n_lower_case": "http://lkjasd.otr"
@ -754,11 +895,15 @@
"datacenter": "GyE6jpeW",
"key": "j9lF1Tve",
"handler": "90N7S4LN"
}, {
},
{
"type": "keyprefix",
"datacenter": "fYrl3F5d",
"key": "sl3Dffu7",
"args": ["dltjDJ2a", "flEa7C2d"]
"args": [
"dltjDJ2a",
"flEa7C2d"
]
}
],
"xds": {

View File

@ -12,6 +12,7 @@ import (
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/checks"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/structs"
libserf "github.com/hashicorp/consul/lib/serf"
"github.com/hashicorp/consul/tlsutil"
@ -318,6 +319,25 @@ type Config struct {
// CheckOutputMaxSize control the max size of output of checks
CheckOutputMaxSize int
// RequestLimitsMode will disable or enable rate limiting. If not disabled, it
// enforces the action that will occur when RequestLimitsReadRate
// or RequestLimitsWriteRate is exceeded. The default value of "disabled" will
// prevent any rate limiting from occuring. A value of "enforce" will block
// the request from processings by returning an error. A value of
// "permissive" will not block the request and will allow the request to
// continue processing.
RequestLimitsMode string
// RequestLimitsReadRate controls how frequently RPC, gRPC, and HTTP
// queries are allowed to happen. In any large enough time interval, rate
// limiter limits the rate to RequestLimitsReadRate tokens per second.
RequestLimitsReadRate rate.Limit
// RequestLimitsWriteRate controls how frequently RPC, gRPC, and HTTP
// writes are allowed to happen. In any large enough time interval, rate
// limiter limits the rate to RequestLimitsWriteRate tokens per second.
RequestLimitsWriteRate rate.Limit
// RPCHandshakeTimeout limits how long we will wait for the initial magic byte
// on an RPC client connection. It also governs how long we will wait for a
// TLS handshake when TLS is configured however the timout applies separately
@ -501,6 +521,10 @@ func DefaultConfig() *Config {
CheckOutputMaxSize: checks.DefaultBufSize,
RequestLimitsMode: "disabled",
RequestLimitsReadRate: rate.Inf, // ops / sec
RequestLimitsWriteRate: rate.Inf, // ops / sec
RPCRateLimit: rate.Inf,
RPCMaxBurst: 1000,
@ -620,9 +644,18 @@ type RPCConfig struct {
EnableStreaming bool
}
// RequestLimits is configuration for serverrate limiting that is a part of
// ReloadableConfig.
type RequestLimits struct {
Mode consulrate.Mode
ReadRate rate.Limit
WriteRate rate.Limit
}
// ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded.
type ReloadableConfig struct {
RequestLimits *RequestLimits
RPCClientTimeout time.Duration
RPCRateLimit rate.Limit
RPCMaxBurst int

View File

@ -3,12 +3,11 @@ package multilimiter
import (
"bytes"
"context"
radix "github.com/hashicorp/go-immutable-radix"
"golang.org/x/time/rate"
"sync"
"sync/atomic"
"time"
radix "github.com/hashicorp/go-immutable-radix"
"golang.org/x/time/rate"
)
var _ RateLimiter = &MultiLimiter{}

View File

@ -30,14 +30,51 @@ var (
type Mode int
const (
// ModeDisabled causes rate limiting to be bypassed.
ModeDisabled Mode = iota
// ModePermissive causes the handler to log the rate-limited operation but
// still allow it to proceed.
ModePermissive Mode = iota
ModePermissive
// ModeEnforcing causes the handler to reject the rate-limted operation.
// ModeEnforcing causes the handler to reject the rate-limited operation.
ModeEnforcing
)
var modeToName = map[Mode]string{
ModeDisabled: "disabled",
ModeEnforcing: "enforcing",
ModePermissive: "permissive",
}
var modeFromName = func() map[string]Mode {
vals := map[string]Mode{
"": ModeDisabled,
}
for k, v := range modeToName {
vals[v] = k
}
return vals
}()
func (m Mode) String() string {
return modeToName[m]
}
// RequestLimitsModeFromName will unmarshal the string form of a configMode.
func RequestLimitsModeFromName(name string) (Mode, bool) {
s, ok := modeFromName[name]
return s, ok
}
// RequestLimitsModeFromNameWithDefault will unmarshal the string form of a configMode.
func RequestLimitsModeFromNameWithDefault(name string) Mode {
s, ok := modeFromName[name]
if !ok {
return ModePermissive
}
return s
}
// OperationType is the type of operation the client is attempting to perform.
type OperationType int
@ -61,6 +98,13 @@ type Operation struct {
Type OperationType
}
//go:generate mockery --name RequestLimitsHandler --inpackage --filename mock_RequestLimitsHandler_test.go
type RequestLimitsHandler interface {
Run(ctx context.Context)
Allow(op Operation) error
UpdateConfig(cfg HandlerConfig)
}
// Handler enforces rate limits for incoming RPCs.
type Handler struct {
cfg *atomic.Pointer[HandlerConfig]
@ -127,7 +171,6 @@ func (h *Handler) Allow(op Operation) error {
return nil
}
// TODO(NET-1379): call this on `consul reload`.
func (h *Handler) UpdateConfig(cfg HandlerConfig) {
h.cfg.Store(&cfg)
h.limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite)
@ -149,3 +192,16 @@ type globalLimit []byte
func (prefix globalLimit) Key() multilimiter.KeyType {
return multilimiter.Key(prefix, nil)
}
// NullRateLimiter returns a RateLimiter that allows every operation.
func NullRateLimiter() RequestLimitsHandler {
return nullRateLimiter{}
}
type nullRateLimiter struct{}
func (nullRateLimiter) Allow(Operation) error { return nil }
func (nullRateLimiter) Run(ctx context.Context) {}
func (nullRateLimiter) UpdateConfig(cfg HandlerConfig) {}

View File

@ -0,0 +1,53 @@
// Code generated by mockery v2.15.0. DO NOT EDIT.
package rate
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockRequestLimitsHandler is an autogenerated mock type for the RequestLimitsHandler type
type MockRequestLimitsHandler struct {
mock.Mock
}
// Allow provides a mock function with given fields: op
func (_m *MockRequestLimitsHandler) Allow(op Operation) error {
ret := _m.Called(op)
var r0 error
if rf, ok := ret.Get(0).(func(Operation) error); ok {
r0 = rf(op)
} else {
r0 = ret.Error(0)
}
return r0
}
// Run provides a mock function with given fields: ctx
func (_m *MockRequestLimitsHandler) Run(ctx context.Context) {
_m.Called(ctx)
}
// UpdateConfig provides a mock function with given fields: cfg
func (_m *MockRequestLimitsHandler) UpdateConfig(cfg HandlerConfig) {
_m.Called(cfg)
}
type mockConstructorTestingTNewMockRequestLimitsHandler interface {
mock.TestingT
Cleanup(func())
}
// NewMockRequestLimitsHandler creates a new instance of MockRequestLimitsHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRequestLimitsHandler(t mockConstructorTestingTNewMockRequestLimitsHandler) *MockRequestLimitsHandler {
mock := &MockRequestLimitsHandler{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -5,7 +5,6 @@ import (
"crypto/x509"
"errors"
"fmt"
"github.com/hashicorp/consul/agent/consul/multilimiter"
"io"
"net"
"os"
@ -35,6 +34,7 @@ import (
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/multilimiter"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
@ -144,6 +144,8 @@ const (
PoolKindSegment = "segment"
)
const requestLimitsBurstMultiplier = 10
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
@ -279,7 +281,7 @@ type Server struct {
rpcServer *rpc.Server
// incomingRPCLimiter rate-limits incoming net/rpc and gRPC calls.
incomingRPCLimiter *rpcRate.Handler
incomingRPCLimiter rpcRate.RequestLimitsHandler
// insecureRPCServer is a RPC server that is configure with
// IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign
@ -396,6 +398,7 @@ type Server struct {
EnterpriseServer
operatorServer *operator.Server
}
type connHandler interface {
Run() error
Handle(conn net.Conn)
@ -469,12 +472,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
})
// TODO(NET-1380, NET-1381): thread this into the net/rpc and gRPC interceptors.
s.incomingRPCLimiter = rpcRate.NewHandler(rpcRate.HandlerConfig{
// TODO(server-rate-limit): revisit those value based on the multilimiter final implementation
Config: multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second},
// TODO(NET-1379): pass in _real_ configuration.
GlobalMode: rpcRate.ModePermissive,
}, s)
if s.incomingRPCLimiter == nil {
mlCfg := &multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second}
limitsConfig := &RequestLimits{
Mode: rpcRate.RequestLimitsModeFromNameWithDefault(config.RequestLimitsMode),
ReadRate: config.RequestLimitsReadRate,
WriteRate: config.RequestLimitsWriteRate,
}
s.incomingRPCLimiter = rpcRate.NewHandler(*s.convertConsulConfigToRateLimitHandlerConfig(*limitsConfig, mlCfg), s)
}
s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
var recorder *middleware.RequestRecorder
@ -1680,6 +1687,11 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error {
}
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
if config.RequestLimits != nil {
s.incomingRPCLimiter.UpdateConfig(*s.convertConsulConfigToRateLimitHandlerConfig(*config.RequestLimits, nil))
}
s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
})
@ -1830,10 +1842,31 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
}
}
// convertConsulConfigToRateLimitHandlerConfig creates a rate limite handler config
// from the relevant fields in the consul runtime config.
func (s *Server) convertConsulConfigToRateLimitHandlerConfig(limitsConfig RequestLimits, multilimiterConfig *multilimiter.Config) *rpcRate.HandlerConfig {
hc := &rpcRate.HandlerConfig{
GlobalMode: limitsConfig.Mode,
GlobalReadConfig: multilimiter.LimiterConfig{
Rate: limitsConfig.ReadRate,
Burst: int(limitsConfig.ReadRate) * requestLimitsBurstMultiplier,
},
GlobalWriteConfig: multilimiter.LimiterConfig{
Rate: limitsConfig.WriteRate,
Burst: int(limitsConfig.WriteRate) * requestLimitsBurstMultiplier,
},
}
if multilimiterConfig != nil {
hc.Config = *multilimiterConfig
}
return hc
}
// IncomingRPCLimiter returns the server's configured rate limit handler for
// incoming RPCs. This is necessary because the external gRPC server is created
// by the agent (as it is also used for xDS).
func (s *Server) IncomingRPCLimiter() *rpcRate.Handler { return s.incomingRPCLimiter }
func (s *Server) IncomingRPCLimiter() rpcRate.RequestLimitsHandler { return s.incomingRPCLimiter }
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same

View File

@ -29,6 +29,8 @@ import (
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/multilimiter"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
external "github.com/hashicorp/consul/agent/grpc-external"
grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/metadata"
@ -331,7 +333,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
oldNotify()
}
}
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, grpcmiddleware.NullRateLimiter())
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRateLimiter())
srv, err := NewServer(c, deps, grpcServer)
if err != nil {
return nil, err
@ -1818,6 +1820,9 @@ func TestServer_ReloadConfig(t *testing.T) {
c.Build = "1.5.0"
c.RPCRateLimit = 500
c.RPCMaxBurst = 5000
c.RequestLimitsMode = "permissive"
c.RequestLimitsReadRate = 500
c.RequestLimitsWriteRate = 500
c.RPCClientTimeout = 60 * time.Second
// Set one raft param to be non-default in the initial config, others are
// default.
@ -1835,6 +1840,11 @@ func TestServer_ReloadConfig(t *testing.T) {
require.Equal(t, 60*time.Second, s.connPool.RPCClientTimeout())
rc := ReloadableConfig{
RequestLimits: &RequestLimits{
Mode: rpcRate.ModeEnforcing,
ReadRate: 1000,
WriteRate: 1100,
},
RPCClientTimeout: 2 * time.Minute,
RPCRateLimit: 1000,
RPCMaxBurst: 10000,
@ -1848,6 +1858,11 @@ func TestServer_ReloadConfig(t *testing.T) {
// Leave other raft fields default
}
mockHandler := rpcRate.NewMockRequestLimitsHandler(t)
mockHandler.On("UpdateConfig", mock.Anything).Return(func(cfg rpcRate.HandlerConfig) {})
s.incomingRPCLimiter = mockHandler
require.NoError(t, s.ReloadConfig(rc))
_, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMetaInDefaultPartition())
@ -1864,6 +1879,19 @@ func TestServer_ReloadConfig(t *testing.T) {
require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst())
// Check the incoming RPC rate limiter got updated
mockHandler.AssertCalled(t, "UpdateConfig", rpcRate.HandlerConfig{
GlobalMode: rc.RequestLimits.Mode,
GlobalReadConfig: multilimiter.LimiterConfig{
Rate: rc.RequestLimits.ReadRate,
Burst: int(rc.RequestLimits.ReadRate) * requestLimitsBurstMultiplier,
},
GlobalWriteConfig: multilimiter.LimiterConfig{
Rate: rc.RequestLimits.WriteRate,
Burst: int(rc.RequestLimits.WriteRate) * requestLimitsBurstMultiplier,
},
})
// Check RPC client timeout got updated
require.Equal(t, 2*time.Minute, s.connPool.RPCClientTimeout())

View File

@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"github.com/hashicorp/consul/agent/consul/rate"
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/tlsutil"
)
@ -23,7 +24,7 @@ var (
// NewServer constructs a gRPC server for the external gRPC port, to which
// handlers can be registered.
func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter agentmiddleware.RateLimiter) *grpc.Server {
func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter rate.RequestLimitsHandler) *grpc.Server {
if metricsObj == nil {
metricsObj = metrics.Default()
}

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/rate"
grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
@ -23,7 +24,7 @@ import (
func TestServer_EmitsStats(t *testing.T) {
sink, metricsObj := testutil.NewFakeSink(t)
srv := NewServer(hclog.Default(), metricsObj, nil, grpcmiddleware.NullRateLimiter())
srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRateLimiter())
testservice.RegisterSimpleServer(srv, &testservice.Simple{})

View File

@ -11,6 +11,7 @@ import (
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/hashicorp/consul/agent/consul/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
@ -25,7 +26,7 @@ var (
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
// The register function will be called with the grpc.Server to register
// gRPC services with the server.
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter agentmiddleware.RateLimiter) *Handler {
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter rate.RequestLimitsHandler) *Handler {
if metricsObj == nil {
metricsObj = metrics.Default()
}

View File

@ -14,7 +14,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
@ -55,7 +55,7 @@ func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsC
func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(logger, addr, register, nil, middleware.NullRateLimiter())
handler := NewHandler(logger, addr, register, nil, rate.NullRateLimiter())
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

View File

@ -19,10 +19,10 @@ import (
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
grpc "github.com/hashicorp/consul/agent/grpc-internal"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
@ -381,7 +381,7 @@ func runTestServer(t *testing.T, server *Server) net.Addr {
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
},
nil,
middleware.NullRateLimiter(),
rate.NullRateLimiter(),
)
lis, err := net.Listen("tcp", "127.0.0.1:0")

View File

@ -14,7 +14,7 @@ import (
"github.com/hashicorp/go-hclog"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/proto/prototest"
@ -26,7 +26,7 @@ func TestHandler_EmitsStats(t *testing.T) {
sink, metricsObj := testutil.NewFakeSink(t)
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj, middleware.NullRateLimiter())
handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj, rate.NullRateLimiter())
testservice.RegisterSimpleServer(handler.srv, &testservice.Simple{})

View File

@ -1,39 +0,0 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
package middleware
import (
testing "testing"
rate "github.com/hashicorp/consul/agent/consul/rate"
mock "github.com/stretchr/testify/mock"
)
// MockRateLimiter is an autogenerated mock type for the RateLimiter type
type MockRateLimiter struct {
mock.Mock
}
// Allow provides a mock function with given fields: _a0
func (_m *MockRateLimiter) Allow(_a0 rate.Operation) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(rate.Operation) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewMockRateLimiter creates a new instance of MockRateLimiter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRateLimiter(t testing.TB) *MockRateLimiter {
mock := &MockRateLimiter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -17,7 +17,7 @@ import (
// ServerRateLimiterMiddleware implements a ServerInHandle function to perform
// RPC rate limiting at the cheapest possible point (before the full request has
// been decoded).
func ServerRateLimiterMiddleware(limiter RateLimiter, panicHandler recovery.RecoveryHandlerFunc) tap.ServerInHandle {
func ServerRateLimiterMiddleware(limiter rate.RequestLimitsHandler, panicHandler recovery.RecoveryHandlerFunc) tap.ServerInHandle {
return func(ctx context.Context, info *tap.Info) (_ context.Context, retErr error) {
// This function is called before unary and stream RPC interceptors, so we
// must handle our own panics here.
@ -56,17 +56,3 @@ func ServerRateLimiterMiddleware(limiter RateLimiter, panicHandler recovery.Reco
}
}
}
//go:generate mockery --name RateLimiter --inpackage
type RateLimiter interface {
Allow(rate.Operation) error
}
// NullRateLimiter returns a RateLimiter that allows every operation.
func NullRateLimiter() RateLimiter {
return nullRateLimiter{}
}
type nullRateLimiter struct{}
func (nullRateLimiter) Allow(rate.Operation) error { return nil }

View File

@ -21,7 +21,7 @@ import (
)
func TestServerRateLimiterMiddleware_Integration(t *testing.T) {
limiter := NewMockRateLimiter(t)
limiter := rate.NewMockRequestLimitsHandler(t)
server := grpc.NewServer(
grpc.InTapHandle(ServerRateLimiterMiddleware(limiter, NewPanicHandler(hclog.NewNullLogger()))),

View File

@ -22,6 +22,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
external "github.com/hashicorp/consul/agent/grpc-external"
@ -1591,7 +1592,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta()
deps := newDefaultDeps(t, conf)
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, agentmiddleware.NullRateLimiter())
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRateLimiter())
server, err := consul.NewServer(conf, deps, externalGRPCServer)
require.NoError(t, err)

View File

@ -46,7 +46,7 @@ There are four specific cases covered with increasing complexity:
- [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in
`agent/config/runtime_test.go`.
- [ ] If the config needs to be defaulted for the test server used in unit tests,
also add it to `DefaultConfig()` in `agent/consul/defaults.go`.
also add it to `DefaultConfig()` in `agent/consul/config.go`.
- [ ] **If** your config should take effect on a reload/HUP.
- [ ] Add necessary code to to trigger a safe (locked or atomic) update to
any state the feature needs changing. This needs to be added to one or

View File

@ -541,6 +541,10 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `http_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single client IP address is allowed to open to the agent's HTTP(S) server. This affects the HTTP(S) servers in both client and server agents. Default value is `200`.
- `https_handshake_timeout` - Configures the limit for how long the HTTPS server in both client and server agents will wait for a client to complete a TLS handshake. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). Default value is `5s`.
- `request_limits` - This object povides configuration for rate limiting RPC and gRPC requests on the consul server. As a result of rate limiting gRPC and RPC request, HTTP requests to the Consul server are rate limited.
- `mode` - Configures whether rate limiting is enabled or not as well as how it behaves through the use of 3 possible modes. The default value of "disabled" will prevent any rate limiting from occuring. A value of "permissive" will cause the system to track requests against the `read_rate` and `write_rate` but will only log violations and will not block and will allow the request to continue processing. A value of "enforcing" also tracks requests against the `read_rate` and `write_rate` but in addition to logging violations, the system will block the request from processings by returning an error.
- `read_rate` - Configures how frequently RPC, gRPC, and HTTP queries are allowed to happen. The rate limiter limits the rate to tokens per second equal to this value. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
- `write_rate` - Configures how frequently RPC, gRPC, and HTTP write are allowed to happen. The rate limiter limits the rate to tokens per second equal to this value. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
- `rpc_handshake_timeout` - Configures the limit for how long servers will wait after a client TCP connection is established before they complete the connection handshake. When TLS is used, the same timeout applies to the TLS handshake separately from the initial protocol negotiation. All Consul clients should perform this immediately on establishing a new connection. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). When `verify_incoming` is true on servers, this limits how long the connection socket and associated goroutines will be held open before the client successfully authenticates. Default value is `5s`.
- `rpc_client_timeout` - Configures the limit for how long a client is allowed to read from an RPC connection. This is used to set an upper bound for calls to eventually terminate so that RPC connections are not held indefinitely. Blocking queries can override this timeout. Default is `60s`.
- `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.