diff --git a/agent/agent.go b/agent/agent.go index 670ec8591a..3998ba3abb 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -40,6 +40,7 @@ import ( "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/servercert" "github.com/hashicorp/consul/agent/dns" external "github.com/hashicorp/consul/agent/grpc-external" @@ -564,7 +565,8 @@ func (a *Agent) Start(ctx context.Context) error { } // gRPC calls are only rate-limited on server, not client agents. - grpcRateLimiter := middleware.NullRateLimiter() + var grpcRateLimiter rpcRate.RequestLimitsHandler + grpcRateLimiter = rpcRate.NullRateLimiter() if s, ok := a.delegate.(*consul.Server); ok { grpcRateLimiter = s.IncomingRPCLimiter() } @@ -1479,6 +1481,10 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co cfg.PeeringEnabled = runtimeCfg.PeeringEnabled cfg.PeeringTestAllowPeerRegistrations = runtimeCfg.PeeringTestAllowPeerRegistrations + cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String() + cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate + cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate + enterpriseConsulConfig(cfg, runtimeCfg) return cfg, nil } @@ -4034,17 +4040,18 @@ func (a *Agent) reloadConfig(autoReload bool) error { {a.config.TLS.HTTPS, newCfg.TLS.HTTPS}, } { if f.oldCfg.KeyFile != f.newCfg.KeyFile { - err = a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile) + a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile) if err != nil { return err } } if f.oldCfg.CertFile != f.newCfg.CertFile { - err = a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile) + a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile) if err != nil { return err } } + if revertStaticConfig(f.oldCfg, f.newCfg) { a.logger.Warn("Changes to your configuration were detected that for security reasons cannot be automatically applied by 'auto_reload_config'. Manually reload your configuration (e.g. with 'consul reload') to apply these changes.", "StaticRuntimeConfig", f.oldCfg, "StaticRuntimeConfig From file", f.newCfg) } @@ -4145,6 +4152,11 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { } cc := consul.ReloadableConfig{ + RequestLimits: &consul.RequestLimits{ + Mode: newCfg.RequestLimitsMode, + ReadRate: newCfg.RequestLimitsReadRate, + WriteRate: newCfg.RequestLimitsWriteRate, + }, RPCClientTimeout: newCfg.RPCClientTimeout, RPCRateLimit: newCfg.RPCRateLimit, RPCMaxBurst: newCfg.RPCMaxBurst, diff --git a/agent/agent_test.go b/agent/agent_test.go index d32c4981d6..9d9f710a6f 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4243,6 +4243,28 @@ func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) { require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs) } +func TestAgent_consulConfig_RequestLimits(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + hcl := ` + limits { + request_limits { + mode = "enforcing" + read_rate = 8888 + write_rate = 9999 + } + } + ` + a := NewTestAgent(t, hcl) + defer a.Shutdown() + require.Equal(t, "enforcing", a.consulConfig().RequestLimitsMode) + require.Equal(t, rate.Limit(8888), a.consulConfig().RequestLimitsReadRate) + require.Equal(t, rate.Limit(9999), a.consulConfig().RequestLimitsWriteRate) +} + func TestAgent_grpcInjectAddr(t *testing.T) { tt := []struct { name string diff --git a/agent/config/builder.go b/agent/config/builder.go index f071bd206a..a28ddab2a7 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -32,6 +32,7 @@ import ( "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" @@ -1045,6 +1046,9 @@ func (b *builder) build() (rt RuntimeConfig, err error) { ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN), ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN), RejoinAfterLeave: boolVal(c.RejoinAfterLeave), + RequestLimitsMode: b.requestsLimitsModeVal(stringVal(c.Limits.RequestLimits.Mode)), + RequestLimitsReadRate: limitVal(c.Limits.RequestLimits.ReadRate), + RequestLimitsWriteRate: limitVal(c.Limits.RequestLimits.WriteRate), RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN), RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN), RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN), @@ -1778,6 +1782,19 @@ func (b *builder) dnsRecursorStrategyVal(v string) dns.RecursorStrategy { return out } +func (b *builder) requestsLimitsModeVal(v string) consulrate.Mode { + var out consulrate.Mode + + mode, ok := consulrate.RequestLimitsModeFromName(v) + if !ok { + b.err = multierror.Append(b.err, fmt.Errorf("limits.request_limits.mode: invalid mode: %q", v)) + } else { + out = mode + } + + return out +} + func (b *builder) exposeConfVal(v *ExposeConfig) structs.ExposeConfig { var out structs.ExposeConfig if v == nil { @@ -1993,6 +2010,15 @@ func float64Val(v *float64) float64 { return float64ValWithDefault(v, 0) } +func limitVal(v *float64) rate.Limit { + f := float64Val(v) + if f < 0 { + return rate.Inf + } + + return rate.Limit(f) +} + func (b *builder) cidrsVal(name string, v []string) (nets []*net.IPNet) { if v == nil { return diff --git a/agent/config/config.go b/agent/config/config.go index e199225414..186ebb10aa 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -712,16 +712,23 @@ type UnixSocket struct { User *string `mapstructure:"user"` } +type RequestLimits struct { + Mode *string `mapstructure:"mode"` + ReadRate *float64 `mapstructure:"read_rate"` + WriteRate *float64 `mapstructure:"write_rate"` +} + type Limits struct { - HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"` - HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"` - RPCClientTimeout *string `mapstructure:"rpc_client_timeout"` - RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"` - RPCMaxBurst *int `mapstructure:"rpc_max_burst"` - RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"` - RPCRate *float64 `mapstructure:"rpc_rate"` - KVMaxValueSize *uint64 `mapstructure:"kv_max_value_size"` - TxnMaxReqLen *uint64 `mapstructure:"txn_max_req_len"` + HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"` + HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"` + RequestLimits RequestLimits `mapstructure:"request_limits"` + RPCClientTimeout *string `mapstructure:"rpc_client_timeout"` + RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"` + RPCMaxBurst *int `mapstructure:"rpc_max_burst"` + RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"` + RPCRate *float64 `mapstructure:"rpc_rate"` + KVMaxValueSize *uint64 `mapstructure:"kv_max_value_size"` + TxnMaxReqLen *uint64 `mapstructure:"txn_max_req_len"` } type Segment struct { diff --git a/agent/config/default.go b/agent/config/default.go index e80a0d6c31..2615579c1f 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -97,6 +97,11 @@ func DefaultSource() Source { limits = { http_max_conns_per_client = 200 https_handshake_timeout = "5s" + request_limits = { + mode = "disabled" + read_rate = -1 + write_rate = -1 + } rpc_handshake_timeout = "5s" rpc_client_timeout = "60s" rpc_rate = -1 diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 199e2ac095..6d31f05432 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/dns" hcpconfig "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/structs" @@ -923,14 +924,14 @@ type RuntimeConfig struct { // See https://en.wikipedia.org/wiki/Token_bucket for more about token // buckets. // - // hcl: limit { rpc_rate = (float64|MaxFloat64) rpc_max_burst = int } + // hcl: limits { rpc_rate = (float64|MaxFloat64) rpc_max_burst = int } RPCRateLimit rate.Limit RPCMaxBurst int // RPCMaxConnsPerClient limits the number of concurrent TCP connections the // RPC server will accept from any single source IP address. // - // hcl: limits{ rpc_max_conns_per_client = 100 } + // hcl: limits { rpc_max_conns_per_client = 100 } RPCMaxConnsPerClient int // RPCProtocol is the Consul protocol version to use. @@ -1009,6 +1010,37 @@ type RuntimeConfig struct { // flag: -rejoin RejoinAfterLeave bool + // RequestLimitsMode will disable or enable rate limiting. If not disabled, it + // enforces the action that will occur when RequestLimitsReadRate + // or RequestLimitsWriteRate is exceeded. The default value of "disabled" will + // prevent any rate limiting from occuring. A value of "enforce" will block + // the request from processings by returning an error. A value of + // "permissive" will not block the request and will allow the request to + // continue processing. + // + // hcl: limits { request_limits { mode = "permissive" } } + RequestLimitsMode consulrate.Mode + + // RequestLimitsReadRate controls how frequently RPC, gRPC, and HTTP + // queries are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsReadRate tokens per second. + // + // See https://en.wikipedia.org/wiki/Token_bucket for more about token + // buckets. + // + // hcl: limits { request_limits { read_rate = (float64|MaxFloat64) } } + RequestLimitsReadRate rate.Limit + + // RequestLimitsWriteRate controls how frequently RPC, gRPC, and HTTP + // writes are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsWriteRate tokens per second. + // + // See https://en.wikipedia.org/wiki/Token_bucket for more about token + // buckets. + // + // hcl: limits { request_limits { write_rate = (float64|MaxFloat64) } } + RequestLimitsWriteRate rate.Limit + // RetryJoinIntervalLAN specifies the amount of time to wait in between join // attempts on agent start. The minimum allowed value is 1 second and // the default is 30s. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index e5e571fee4..842cf16a01 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -20,6 +20,7 @@ import ( "github.com/armon/go-metrics/prometheus" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" hcpconfig "github.com/hashicorp/consul/agent/hcp/config" @@ -27,6 +28,7 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/consul" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/lib" @@ -4615,6 +4617,9 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { rt.HTTPSHandshakeTimeout = 5 * time.Second rt.HTTPMaxConnsPerClient = 200 rt.RPCMaxConnsPerClient = 100 + rt.RequestLimitsMode = consulrate.ModeDisabled + rt.RequestLimitsReadRate = rate.Inf + rt.RequestLimitsWriteRate = rate.Inf rt.SegmentLimit = 64 rt.XDSUpdateRateLimit = 250 }, @@ -6163,6 +6168,9 @@ func TestLoad_FullConfig(t *testing.T) { RaftTrailingLogs: 83749, ReconnectTimeoutLAN: 23739 * time.Second, ReconnectTimeoutWAN: 26694 * time.Second, + RequestLimitsMode: consulrate.ModePermissive, + RequestLimitsReadRate: 99.0, + RequestLimitsWriteRate: 101.0, RejoinAfterLeave: true, RetryJoinIntervalLAN: 8067 * time.Second, RetryJoinIntervalWAN: 28866 * time.Second, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 98317d3cf2..a568789c52 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -278,6 +278,9 @@ "ReconnectTimeoutLAN": "0s", "ReconnectTimeoutWAN": "0s", "RejoinAfterLeave": false, + "RequestLimitsMode": 0, + "RequestLimitsReadRate": 0, + "RequestLimitsWriteRate": 0, "RetryJoinIntervalLAN": "0s", "RetryJoinIntervalWAN": "0s", "RetryJoinLAN": [ diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index f499464193..f237451f0d 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -305,6 +305,11 @@ limits { rpc_max_conns_per_client = 2954 kv_max_value_size = 1234567800 txn_max_req_len = 567800000 + request_limits { + mode = "permissive" + read_rate = 99.0 + write_rate = 101.0 + } } log_level = "k1zo9Spt" log_json = true diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index e578e6f33e..6a07dff88d 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -9,25 +9,25 @@ "acl_replication_token": "LMmgy5dO", "acl_token": "O1El0wan", "acl_ttl": "18060s", - "acl" : { - "enabled" : true, - "down_policy" : "03eb2aee", - "default_policy" : "72c2e7a0", + "acl": { + "enabled": true, + "down_policy": "03eb2aee", + "default_policy": "72c2e7a0", "enable_key_list_policy": true, "enable_token_persistence": true, "policy_ttl": "1123s", "role_ttl": "9876s", "token_ttl": "3321s", - "enable_token_replication" : true, + "enable_token_replication": true, "msp_disable_bootstrap": true, - "tokens" : { - "master" : "8a19ac27", - "initial_management" : "3820e09a", - "agent_master" : "64fd0e08", - "agent_recovery" : "1dba6aba", - "replication" : "5795983a", - "agent" : "bed2377c", - "default" : "418fdff1", + "tokens": { + "master": "8a19ac27", + "initial_management": "3820e09a", + "agent_master": "64fd0e08", + "agent_recovery": "1dba6aba", + "replication": "5795983a", + "agent": "bed2377c", + "default": "418fdff1", "managed_service_provider": [ { "accessor_id": "first", @@ -57,9 +57,15 @@ "enabled": false, "intro_token": "OpBPGRwt", "intro_token_file": "gFvAXwI8", - "dns_sans": ["6zdaWg9J"], - "ip_sans": ["198.18.99.99"], - "server_addresses": ["198.18.100.1"], + "dns_sans": [ + "6zdaWg9J" + ], + "ip_sans": [ + "198.18.99.99" + ], + "server_addresses": [ + "198.18.100.1" + ], "authorization": { "enabled": true, "static": { @@ -71,9 +77,15 @@ "foo": "bar" }, "bound_issuer": "consul", - "bound_audiences": ["consul-cluster-1"], - "claim_assertions": ["value.node == \"${node}\""], - "jwt_validation_pub_keys": ["-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAERVchfCZng4mmdvQz1+sJHRN40snC\nYt8NjYOnbnScEXMkyoUmASr88gb7jaVAVt3RYASAbgBjB2Z+EUizWkx5Tg==\n-----END PUBLIC KEY-----"] + "bound_audiences": [ + "consul-cluster-1" + ], + "claim_assertions": [ + "value.node == \"${node}\"" + ], + "jwt_validation_pub_keys": [ + "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAERVchfCZng4mmdvQz1+sJHRN40snC\nYt8NjYOnbnScEXMkyoUmASr88gb7jaVAVt3RYASAbgBjB2Z+EUizWkx5Tg==\n-----END PUBLIC KEY-----" + ] } } }, @@ -82,7 +94,7 @@ "disable_upgrade_migration": true, "last_contact_threshold": "12705s", "max_trailing_logs": 17849, - "min_quorum": 3, + "min_quorum": 3, "redundancy_zone_tag": "3IsufDJf", "server_stabilization_time": "23057s", "upgrade_version_tag": "W9pDwFAL" @@ -104,11 +116,20 @@ "service_id": "L8G0QNmR", "token": "oo4BCTgJ", "status": "qLykAl5u", - "args": ["f3BemRjy", "e5zgpef7"], + "args": [ + "f3BemRjy", + "e5zgpef7" + ], "http": "29B93haH", "header": { - "hBq0zn1q": [ "2a9o9ZKP", "vKwA5lR6" ], - "f3r6xFtM": [ "RyuIdDWv", "QbxEcIUM" ] + "hBq0zn1q": [ + "2a9o9ZKP", + "vKwA5lR6" + ], + "f3r6xFtM": [ + "RyuIdDWv", + "QbxEcIUM" + ] }, "method": "Dou0nGT5", "body": "5PBQd2OT", @@ -134,11 +155,20 @@ "service_id": "lSulPcyz", "token": "toO59sh8", "status": "9RlWsXMV", - "args": ["4BAJttck", "4D2NPtTQ"], + "args": [ + "4BAJttck", + "4D2NPtTQ" + ], "http": "dohLcyQ2", "header": { - "ZBfTin3L": [ "1sDbEqYG", "lJGASsWK" ], - "Ui0nU99X": [ "LMccm3Qe", "k5H5RggQ" ] + "ZBfTin3L": [ + "1sDbEqYG", + "lJGASsWK" + ], + "Ui0nU99X": [ + "LMccm3Qe", + "k5H5RggQ" + ] }, "method": "aldrIQ4l", "body": "wSjTy7dg", @@ -163,11 +193,20 @@ "service_id": "CmUUcRna", "token": "a3nQzHuy", "status": "irj26nf3", - "args": ["9s526ogY", "gSlOHj1w"], + "args": [ + "9s526ogY", + "gSlOHj1w" + ], "http": "yzhgsQ7Y", "header": { - "zcqwA8dO": [ "qb1zx0DL", "sXCxPFsD" ], - "qxvdnSE9": [ "6wBPUYdF", "YYh8wtSZ" ] + "zcqwA8dO": [ + "qb1zx0DL", + "sXCxPFsD" + ], + "qxvdnSE9": [ + "6wBPUYdF", + "YYh8wtSZ" + ] }, "method": "gLrztrNw", "body": "0jkKgGUC", @@ -202,15 +241,21 @@ }, "auto_encrypt": { "tls": false, - "dns_san": ["a.com", "b.com"], - "ip_san": ["192.168.4.139", "192.168.4.140"], + "dns_san": [ + "a.com", + "b.com" + ], + "ip_san": [ + "192.168.4.139", + "192.168.4.140" + ], "allow_tls": true }, "cloud": { "resource_id": "N43DsscE", "client_id": "6WvsDZCP", - "client_secret": "lCSMHOpB", - "hostname": "DH4bh7aC", + "client_secret": "lCSMHOpB", + "hostname": "DH4bh7aC", "auth_url": "332nCdR2", "scada_address": "aoeusth232" }, @@ -226,21 +271,21 @@ "enable_mesh_gateway_wan_federation": false, "enabled": true }, - "gossip_lan" : { + "gossip_lan": { "gossip_nodes": 6, - "gossip_interval" : "25252s", - "retransmit_mult" : 1234, - "suspicion_mult" : 1235, - "probe_interval" : "101ms", - "probe_timeout" : "102ms" + "gossip_interval": "25252s", + "retransmit_mult": 1234, + "suspicion_mult": 1235, + "probe_interval": "101ms", + "probe_timeout": "102ms" }, - "gossip_wan" : { - "gossip_nodes" : 2, - "gossip_interval" : "6966s", - "retransmit_mult" : 16384, - "suspicion_mult" : 16385, - "probe_interval" : "103ms", - "probe_timeout" : "104ms" + "gossip_wan": { + "gossip_nodes": 2, + "gossip_interval": "6966s", + "retransmit_mult": 16384, + "suspicion_mult": 16385, + "probe_interval": "103ms", + "probe_timeout": "104ms" }, "datacenter": "rzo029wg", "default_query_time": "16743s", @@ -283,8 +328,15 @@ "encrypt_verify_incoming": true, "encrypt_verify_outgoing": true, "http_config": { - "block_endpoints": [ "RBvAFcGD", "fWOWFznh" ], - "allow_write_http_from": [ "127.0.0.1/8", "22.33.44.55/32", "0.0.0.0/0" ], + "block_endpoints": [ + "RBvAFcGD", + "fWOWFznh" + ], + "allow_write_http_from": [ + "127.0.0.1/8", + "22.33.44.55/32", + "0.0.0.0/0" + ], "response_headers": { "M6TKa9NP": "xjuxjOzQ", "JRCrHZed": "rl0mTx81" @@ -304,7 +356,12 @@ "rpc_max_burst": 44848, "rpc_max_conns_per_client": 2954, "kv_max_value_size": 1234567800, - "txn_max_req_len": 567800000 + "txn_max_req_len": 567800000, + "request_limits": { + "mode": "permissive", + "read_rate": 99.0, + "write_rate": 101.0 + } }, "log_level": "k1zo9Spt", "log_json": true, @@ -340,7 +397,10 @@ }, "protocol": 30793, "primary_datacenter": "ejtmd43d", - "primary_gateways": [ "aej8eeZo", "roh2KahS" ], + "primary_gateways": [ + "aej8eeZo", + "roh2KahS" + ], "primary_gateways_interval": "18866s", "raft_protocol": 3, "raft_snapshot_threshold": 16384, @@ -352,15 +412,26 @@ "read_replica": true, "reconnect_timeout": "23739s", "reconnect_timeout_wan": "26694s", - "recursors": [ "63.38.39.58", "92.49.18.18" ], + "recursors": [ + "63.38.39.58", + "92.49.18.18" + ], "rejoin_after_leave": true, "retry_interval": "8067s", "retry_interval_wan": "28866s", - "retry_join": [ "pbsSFY7U", "l0qLtWij" ], - "retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ], + "retry_join": [ + "pbsSFY7U", + "l0qLtWij" + ], + "retry_join_wan": [ + "PFsR02Ye", + "rJdQIhER" + ], "retry_max": 913, "retry_max_wan": 23160, - "rpc": {"enable_streaming": true}, + "rpc": { + "enable_streaming": true + }, "segment_limit": 123, "serf_lan": "99.43.63.15", "serf_wan": "67.88.33.19", @@ -382,7 +453,10 @@ "port": 6109 } }, - "tags": ["nkwshvM5", "NTDWn3ek"], + "tags": [ + "nkwshvM5", + "NTDWn3ek" + ], "address": "cOlSOhbp", "token": "msy7iWER", "port": 24237, @@ -396,11 +470,19 @@ "name": "iehanzuq", "status": "rCvn53TH", "notes": "fti5lfF3", - "args": ["16WRUmwS", "QWk7j7ae"], + "args": [ + "16WRUmwS", + "QWk7j7ae" + ], "http": "dl3Fgme3", "header": { - "rjm4DEd3": ["2m3m2Fls"], - "l4HwQ112": ["fk56MNlo", "dhLK56aZ"] + "rjm4DEd3": [ + "2m3m2Fls" + ], + "l4HwQ112": [ + "fk56MNlo", + "dhLK56aZ" + ] }, "method": "9afLm3Mj", "body": "wVVL2V6f", @@ -424,11 +506,20 @@ "name": "sgV4F7Pk", "notes": "yP5nKbW0", "status": "7oLMEyfu", - "args": ["5wEZtZpv", "0Ihyk8cS"], + "args": [ + "5wEZtZpv", + "0Ihyk8cS" + ], "http": "KyDjGY9H", "header": { - "gv5qefTz": [ "5Olo2pMG", "PvvKWQU5" ], - "SHOVq1Vv": [ "jntFhyym", "GYJh32pp" ] + "gv5qefTz": [ + "5Olo2pMG", + "PvvKWQU5" + ], + "SHOVq1Vv": [ + "jntFhyym", + "GYJh32pp" + ] }, "method": "T66MFBfR", "body": "OwGjTFQi", @@ -451,11 +542,20 @@ "name": "IEqrzrsd", "notes": "SVqApqeM", "status": "XXkVoZXt", - "args": ["wD05Bvao", "rLYB7kQC"], + "args": [ + "wD05Bvao", + "rLYB7kQC" + ], "http": "kyICZsn8", "header": { - "4ebP5vL4": [ "G20SrL5Q", "DwPKlMbo" ], - "p2UI34Qz": [ "UsG1D0Qh", "NHhRiB6s" ] + "4ebP5vL4": [ + "G20SrL5Q", + "DwPKlMbo" + ], + "p2UI34Qz": [ + "UsG1D0Qh", + "NHhRiB6s" + ] }, "method": "ciYHWors", "body": "lUVLGYU7", @@ -482,7 +582,10 @@ { "id": "wI1dzxS4", "name": "7IszXMQ1", - "tags": ["0Zwg8l6v", "zebELdN5"], + "tags": [ + "0Zwg8l6v", + "zebELdN5" + ], "address": "9RhqPSPB", "token": "myjKJkWH", "port": 72219, @@ -492,11 +595,19 @@ "name": "atDGP7n5", "status": "pDQKEhWL", "notes": "Yt8EDLev", - "args": ["81EDZLPa", "bPY5X8xd"], + "args": [ + "81EDZLPa", + "bPY5X8xd" + ], "http": "qzHYvmJO", "header": { - "UkpmZ3a3": ["2dfzXuxZ"], - "cVFpko4u": ["gGqdEB6k", "9LsRo22u"] + "UkpmZ3a3": [ + "2dfzXuxZ" + ], + "cVFpko4u": [ + "gGqdEB6k", + "9LsRo22u" + ] }, "method": "X5DrovFc", "body": "WeikigLh", @@ -521,7 +632,10 @@ { "id": "MRHVMZuD", "name": "6L6BVfgH", - "tags": ["7Ale4y6o", "PMBW08hy"], + "tags": [ + "7Ale4y6o", + "PMBW08hy" + ], "address": "R6H6g8h0", "token": "ZgY8gjMI", "port": 38292, @@ -536,11 +650,20 @@ "name": "9OOS93ne", "notes": "CQy86DH0", "status": "P0SWDvrk", - "args": ["EXvkYIuG", "BATOyt6h"], + "args": [ + "EXvkYIuG", + "BATOyt6h" + ], "http": "u97ByEiW", "header": { - "MUlReo8L": [ "AUZG7wHG", "gsN0Dc2N" ], - "1UJXjVrT": [ "OJgxzTfk", "xZZrFsq7" ] + "MUlReo8L": [ + "AUZG7wHG", + "gsN0Dc2N" + ], + "1UJXjVrT": [ + "OJgxzTfk", + "xZZrFsq7" + ] }, "method": "5wkAxCUE", "body": "7CRjCJyz", @@ -621,8 +744,8 @@ "destination_namespace": "9nakw0td", "destination_partition": "part-9nakw0td", "destination_type": "prepared_query", - "local_bind_socket_path": "/foo/bar/upstream", - "local_bind_socket_mode": "0600" + "local_bind_socket_path": "/foo/bar/upstream", + "local_bind_socket_mode": "0600" } ] } @@ -634,15 +757,21 @@ "port": 27147, "proxy": { "config": { - "1CuJHVfw" : "Kzqsa7yc" + "1CuJHVfw": "Kzqsa7yc" } } } ], "session_ttl_min": "26627s", "skip_leave_on_interrupt": true, - "start_join": [ "LR3hGDoG", "MwVpZ4Up" ], - "start_join_wan": [ "EbFSc3nA", "kwXTh623" ], + "start_join": [ + "LR3hGDoG", + "MwVpZ4Up" + ], + "start_join_wan": [ + "EbFSc3nA", + "kwXTh623" + ], "syslog_facility": "hHv79Uia", "tagged_addresses": { "7MYgHrYH": "dALJAhLD", @@ -664,10 +793,16 @@ "circonus_submission_url": "gTcbS93G", "disable_hostname": true, "dogstatsd_addr": "0wSndumK", - "dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ], + "dogstatsd_tags": [ + "3N81zSUB", + "Xtj8AnXZ" + ], "retry_failed_connection": true, "filter_default": true, - "prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ], + "prefix_filter": [ + "+oJotS8XJ", + "-cazlEhGn" + ], "metrics_prefix": "ftO6DySn", "prometheus_retention_time": "15s", "statsd_address": "drce87cy", @@ -722,7 +857,10 @@ "dir": "pVncV4Ey", "content_path": "qp1WRhYH", "metrics_provider": "sgnaoa_lower_case", - "metrics_provider_files": ["sgnaMFoa", "dicnwkTH"], + "metrics_provider_files": [ + "sgnaMFoa", + "dicnwkTH" + ], "metrics_provider_options_json": "{\"DIbVQadX\": 1}", "metrics_proxy": { "base_url": "http://foo.bar", @@ -732,7 +870,10 @@ "value": "TYBgnN2F" } ], - "path_allowlist": ["/aSh3cu", "/eiK/2Th"] + "path_allowlist": [ + "/aSh3cu", + "/eiK/2Th" + ] }, "dashboard_url_templates": { "u2eziu2n_lower_case": "http://lkjasd.otr" @@ -754,11 +895,15 @@ "datacenter": "GyE6jpeW", "key": "j9lF1Tve", "handler": "90N7S4LN" - }, { + }, + { "type": "keyprefix", "datacenter": "fYrl3F5d", "key": "sl3Dffu7", - "args": ["dltjDJ2a", "flEa7C2d"] + "args": [ + "dltjDJ2a", + "flEa7C2d" + ] } ], "xds": { diff --git a/agent/consul/config.go b/agent/consul/config.go index 72bd9b3960..b76abdff71 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -12,6 +12,7 @@ import ( "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/checks" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/structs" libserf "github.com/hashicorp/consul/lib/serf" "github.com/hashicorp/consul/tlsutil" @@ -318,6 +319,25 @@ type Config struct { // CheckOutputMaxSize control the max size of output of checks CheckOutputMaxSize int + // RequestLimitsMode will disable or enable rate limiting. If not disabled, it + // enforces the action that will occur when RequestLimitsReadRate + // or RequestLimitsWriteRate is exceeded. The default value of "disabled" will + // prevent any rate limiting from occuring. A value of "enforce" will block + // the request from processings by returning an error. A value of + // "permissive" will not block the request and will allow the request to + // continue processing. + RequestLimitsMode string + + // RequestLimitsReadRate controls how frequently RPC, gRPC, and HTTP + // queries are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsReadRate tokens per second. + RequestLimitsReadRate rate.Limit + + // RequestLimitsWriteRate controls how frequently RPC, gRPC, and HTTP + // writes are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsWriteRate tokens per second. + RequestLimitsWriteRate rate.Limit + // RPCHandshakeTimeout limits how long we will wait for the initial magic byte // on an RPC client connection. It also governs how long we will wait for a // TLS handshake when TLS is configured however the timout applies separately @@ -501,6 +521,10 @@ func DefaultConfig() *Config { CheckOutputMaxSize: checks.DefaultBufSize, + RequestLimitsMode: "disabled", + RequestLimitsReadRate: rate.Inf, // ops / sec + RequestLimitsWriteRate: rate.Inf, // ops / sec + RPCRateLimit: rate.Inf, RPCMaxBurst: 1000, @@ -620,9 +644,18 @@ type RPCConfig struct { EnableStreaming bool } +// RequestLimits is configuration for serverrate limiting that is a part of +// ReloadableConfig. +type RequestLimits struct { + Mode consulrate.Mode + ReadRate rate.Limit + WriteRate rate.Limit +} + // ReloadableConfig is the configuration that is passed to ReloadConfig when // application config is reloaded. type ReloadableConfig struct { + RequestLimits *RequestLimits RPCClientTimeout time.Duration RPCRateLimit rate.Limit RPCMaxBurst int diff --git a/agent/consul/multilimiter/multilimiter.go b/agent/consul/multilimiter/multilimiter.go index 9de1bdaa14..539cced1c2 100644 --- a/agent/consul/multilimiter/multilimiter.go +++ b/agent/consul/multilimiter/multilimiter.go @@ -3,12 +3,11 @@ package multilimiter import ( "bytes" "context" + radix "github.com/hashicorp/go-immutable-radix" + "golang.org/x/time/rate" "sync" "sync/atomic" "time" - - radix "github.com/hashicorp/go-immutable-radix" - "golang.org/x/time/rate" ) var _ RateLimiter = &MultiLimiter{} diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go index 189c0fa0f9..de20ab668b 100644 --- a/agent/consul/rate/handler.go +++ b/agent/consul/rate/handler.go @@ -30,14 +30,51 @@ var ( type Mode int const ( + // ModeDisabled causes rate limiting to be bypassed. + ModeDisabled Mode = iota + // ModePermissive causes the handler to log the rate-limited operation but // still allow it to proceed. - ModePermissive Mode = iota + ModePermissive - // ModeEnforcing causes the handler to reject the rate-limted operation. + // ModeEnforcing causes the handler to reject the rate-limited operation. ModeEnforcing ) +var modeToName = map[Mode]string{ + ModeDisabled: "disabled", + ModeEnforcing: "enforcing", + ModePermissive: "permissive", +} +var modeFromName = func() map[string]Mode { + vals := map[string]Mode{ + "": ModeDisabled, + } + for k, v := range modeToName { + vals[v] = k + } + return vals +}() + +func (m Mode) String() string { + return modeToName[m] +} + +// RequestLimitsModeFromName will unmarshal the string form of a configMode. +func RequestLimitsModeFromName(name string) (Mode, bool) { + s, ok := modeFromName[name] + return s, ok +} + +// RequestLimitsModeFromNameWithDefault will unmarshal the string form of a configMode. +func RequestLimitsModeFromNameWithDefault(name string) Mode { + s, ok := modeFromName[name] + if !ok { + return ModePermissive + } + return s +} + // OperationType is the type of operation the client is attempting to perform. type OperationType int @@ -61,6 +98,13 @@ type Operation struct { Type OperationType } +//go:generate mockery --name RequestLimitsHandler --inpackage --filename mock_RequestLimitsHandler_test.go +type RequestLimitsHandler interface { + Run(ctx context.Context) + Allow(op Operation) error + UpdateConfig(cfg HandlerConfig) +} + // Handler enforces rate limits for incoming RPCs. type Handler struct { cfg *atomic.Pointer[HandlerConfig] @@ -127,7 +171,6 @@ func (h *Handler) Allow(op Operation) error { return nil } -// TODO(NET-1379): call this on `consul reload`. func (h *Handler) UpdateConfig(cfg HandlerConfig) { h.cfg.Store(&cfg) h.limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite) @@ -149,3 +192,16 @@ type globalLimit []byte func (prefix globalLimit) Key() multilimiter.KeyType { return multilimiter.Key(prefix, nil) } + +// NullRateLimiter returns a RateLimiter that allows every operation. +func NullRateLimiter() RequestLimitsHandler { + return nullRateLimiter{} +} + +type nullRateLimiter struct{} + +func (nullRateLimiter) Allow(Operation) error { return nil } + +func (nullRateLimiter) Run(ctx context.Context) {} + +func (nullRateLimiter) UpdateConfig(cfg HandlerConfig) {} diff --git a/agent/consul/rate/mock_RequestLimitsHandler.go b/agent/consul/rate/mock_RequestLimitsHandler.go new file mode 100644 index 0000000000..02569e56e5 --- /dev/null +++ b/agent/consul/rate/mock_RequestLimitsHandler.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.15.0. DO NOT EDIT. + +package rate + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockRequestLimitsHandler is an autogenerated mock type for the RequestLimitsHandler type +type MockRequestLimitsHandler struct { + mock.Mock +} + +// Allow provides a mock function with given fields: op +func (_m *MockRequestLimitsHandler) Allow(op Operation) error { + ret := _m.Called(op) + + var r0 error + if rf, ok := ret.Get(0).(func(Operation) error); ok { + r0 = rf(op) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Run provides a mock function with given fields: ctx +func (_m *MockRequestLimitsHandler) Run(ctx context.Context) { + _m.Called(ctx) +} + +// UpdateConfig provides a mock function with given fields: cfg +func (_m *MockRequestLimitsHandler) UpdateConfig(cfg HandlerConfig) { + _m.Called(cfg) +} + +type mockConstructorTestingTNewMockRequestLimitsHandler interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRequestLimitsHandler creates a new instance of MockRequestLimitsHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRequestLimitsHandler(t mockConstructorTestingTNewMockRequestLimitsHandler) *MockRequestLimitsHandler { + mock := &MockRequestLimitsHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 3ac9fbb254..c37f415e50 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "errors" "fmt" - "github.com/hashicorp/consul/agent/consul/multilimiter" "io" "net" "os" @@ -35,6 +34,7 @@ import ( "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/multilimiter" rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" @@ -144,6 +144,8 @@ const ( PoolKindSegment = "segment" ) +const requestLimitsBurstMultiplier = 10 + // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { @@ -279,7 +281,7 @@ type Server struct { rpcServer *rpc.Server // incomingRPCLimiter rate-limits incoming net/rpc and gRPC calls. - incomingRPCLimiter *rpcRate.Handler + incomingRPCLimiter rpcRate.RequestLimitsHandler // insecureRPCServer is a RPC server that is configure with // IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign @@ -396,6 +398,7 @@ type Server struct { EnterpriseServer operatorServer *operator.Server } + type connHandler interface { Run() error Handle(conn net.Conn) @@ -469,12 +472,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser }) // TODO(NET-1380, NET-1381): thread this into the net/rpc and gRPC interceptors. - s.incomingRPCLimiter = rpcRate.NewHandler(rpcRate.HandlerConfig{ - // TODO(server-rate-limit): revisit those value based on the multilimiter final implementation - Config: multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second}, - // TODO(NET-1379): pass in _real_ configuration. - GlobalMode: rpcRate.ModePermissive, - }, s) + if s.incomingRPCLimiter == nil { + mlCfg := &multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second} + limitsConfig := &RequestLimits{ + Mode: rpcRate.RequestLimitsModeFromNameWithDefault(config.RequestLimitsMode), + ReadRate: config.RequestLimitsReadRate, + WriteRate: config.RequestLimitsWriteRate, + } + + s.incomingRPCLimiter = rpcRate.NewHandler(*s.convertConsulConfigToRateLimitHandlerConfig(*limitsConfig, mlCfg), s) + } s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) var recorder *middleware.RequestRecorder @@ -1680,6 +1687,11 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error { } s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) + + if config.RequestLimits != nil { + s.incomingRPCLimiter.UpdateConfig(*s.convertConsulConfigToRateLimitHandlerConfig(*config.RequestLimits, nil)) + } + s.rpcConnLimiter.SetConfig(connlimit.Config{ MaxConnsPerClientIP: config.RPCMaxConnsPerClient, }) @@ -1830,10 +1842,31 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback { } } +// convertConsulConfigToRateLimitHandlerConfig creates a rate limite handler config +// from the relevant fields in the consul runtime config. +func (s *Server) convertConsulConfigToRateLimitHandlerConfig(limitsConfig RequestLimits, multilimiterConfig *multilimiter.Config) *rpcRate.HandlerConfig { + hc := &rpcRate.HandlerConfig{ + GlobalMode: limitsConfig.Mode, + GlobalReadConfig: multilimiter.LimiterConfig{ + Rate: limitsConfig.ReadRate, + Burst: int(limitsConfig.ReadRate) * requestLimitsBurstMultiplier, + }, + GlobalWriteConfig: multilimiter.LimiterConfig{ + Rate: limitsConfig.WriteRate, + Burst: int(limitsConfig.WriteRate) * requestLimitsBurstMultiplier, + }, + } + if multilimiterConfig != nil { + hc.Config = *multilimiterConfig + } + + return hc +} + // IncomingRPCLimiter returns the server's configured rate limit handler for // incoming RPCs. This is necessary because the external gRPC server is created // by the agent (as it is also used for xDS). -func (s *Server) IncomingRPCLimiter() *rpcRate.Handler { return s.incomingRPCLimiter } +func (s *Server) IncomingRPCLimiter() rpcRate.RequestLimitsHandler { return s.incomingRPCLimiter } // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index b1be492cd2..4cb7b5d97e 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -29,6 +29,8 @@ import ( "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/multilimiter" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" external "github.com/hashicorp/consul/agent/grpc-external" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/metadata" @@ -331,7 +333,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) { oldNotify() } } - grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, grpcmiddleware.NullRateLimiter()) + grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRateLimiter()) srv, err := NewServer(c, deps, grpcServer) if err != nil { return nil, err @@ -1818,6 +1820,9 @@ func TestServer_ReloadConfig(t *testing.T) { c.Build = "1.5.0" c.RPCRateLimit = 500 c.RPCMaxBurst = 5000 + c.RequestLimitsMode = "permissive" + c.RequestLimitsReadRate = 500 + c.RequestLimitsWriteRate = 500 c.RPCClientTimeout = 60 * time.Second // Set one raft param to be non-default in the initial config, others are // default. @@ -1835,6 +1840,11 @@ func TestServer_ReloadConfig(t *testing.T) { require.Equal(t, 60*time.Second, s.connPool.RPCClientTimeout()) rc := ReloadableConfig{ + RequestLimits: &RequestLimits{ + Mode: rpcRate.ModeEnforcing, + ReadRate: 1000, + WriteRate: 1100, + }, RPCClientTimeout: 2 * time.Minute, RPCRateLimit: 1000, RPCMaxBurst: 10000, @@ -1848,6 +1858,11 @@ func TestServer_ReloadConfig(t *testing.T) { // Leave other raft fields default } + + mockHandler := rpcRate.NewMockRequestLimitsHandler(t) + mockHandler.On("UpdateConfig", mock.Anything).Return(func(cfg rpcRate.HandlerConfig) {}) + + s.incomingRPCLimiter = mockHandler require.NoError(t, s.ReloadConfig(rc)) _, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMetaInDefaultPartition()) @@ -1864,6 +1879,19 @@ func TestServer_ReloadConfig(t *testing.T) { require.Equal(t, rate.Limit(1000), limiter.Limit()) require.Equal(t, 10000, limiter.Burst()) + // Check the incoming RPC rate limiter got updated + mockHandler.AssertCalled(t, "UpdateConfig", rpcRate.HandlerConfig{ + GlobalMode: rc.RequestLimits.Mode, + GlobalReadConfig: multilimiter.LimiterConfig{ + Rate: rc.RequestLimits.ReadRate, + Burst: int(rc.RequestLimits.ReadRate) * requestLimitsBurstMultiplier, + }, + GlobalWriteConfig: multilimiter.LimiterConfig{ + Rate: rc.RequestLimits.WriteRate, + Burst: int(rc.RequestLimits.WriteRate) * requestLimitsBurstMultiplier, + }, + }) + // Check RPC client timeout got updated require.Equal(t, 2*time.Minute, s.connPool.RPCClientTimeout()) diff --git a/agent/grpc-external/server.go b/agent/grpc-external/server.go index 98de599c8c..3ea687f8ca 100644 --- a/agent/grpc-external/server.go +++ b/agent/grpc-external/server.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "github.com/hashicorp/consul/agent/consul/rate" agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/tlsutil" ) @@ -23,7 +24,7 @@ var ( // NewServer constructs a gRPC server for the external gRPC port, to which // handlers can be registered. -func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter agentmiddleware.RateLimiter) *grpc.Server { +func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter rate.RequestLimitsHandler) *grpc.Server { if metricsObj == nil { metricsObj = metrics.Default() } diff --git a/agent/grpc-external/stats_test.go b/agent/grpc-external/stats_test.go index c62eb65593..40d8509969 100644 --- a/agent/grpc-external/stats_test.go +++ b/agent/grpc-external/stats_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/consul/rate" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/grpc-middleware/testutil" "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" @@ -23,7 +24,7 @@ import ( func TestServer_EmitsStats(t *testing.T) { sink, metricsObj := testutil.NewFakeSink(t) - srv := NewServer(hclog.Default(), metricsObj, nil, grpcmiddleware.NullRateLimiter()) + srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRateLimiter()) testservice.RegisterSimpleServer(srv, &testservice.Simple{}) diff --git a/agent/grpc-internal/handler.go b/agent/grpc-internal/handler.go index fc563df4ed..f78e17c53b 100644 --- a/agent/grpc-internal/handler.go +++ b/agent/grpc-internal/handler.go @@ -11,6 +11,7 @@ import ( middleware "github.com/grpc-ecosystem/go-grpc-middleware" recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashicorp/consul/agent/consul/rate" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) @@ -25,7 +26,7 @@ var ( // NewHandler returns a gRPC server that accepts connections from Handle(conn). // The register function will be called with the grpc.Server to register // gRPC services with the server. -func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter agentmiddleware.RateLimiter) *Handler { +func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter rate.RequestLimitsHandler) *Handler { if metricsObj == nil { metricsObj = metrics.Default() } diff --git a/agent/grpc-internal/server_test.go b/agent/grpc-internal/server_test.go index 1cd66ec0c0..4e76ac5940 100644 --- a/agent/grpc-internal/server_test.go +++ b/agent/grpc-internal/server_test.go @@ -14,7 +14,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" - middleware "github.com/hashicorp/consul/agent/grpc-middleware" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" @@ -55,7 +55,7 @@ func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsC func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} - handler := NewHandler(logger, addr, register, nil, middleware.NullRateLimiter()) + handler := NewHandler(logger, addr, register, nil, rate.NullRateLimiter()) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) diff --git a/agent/grpc-internal/services/subscribe/subscribe_test.go b/agent/grpc-internal/services/subscribe/subscribe_test.go index c304543f41..2ef6a41e82 100644 --- a/agent/grpc-internal/services/subscribe/subscribe_test.go +++ b/agent/grpc-internal/services/subscribe/subscribe_test.go @@ -19,10 +19,10 @@ import ( "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" grpc "github.com/hashicorp/consul/agent/grpc-internal" - middleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbcommon" @@ -381,7 +381,7 @@ func runTestServer(t *testing.T, server *Server) net.Addr { pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server) }, nil, - middleware.NullRateLimiter(), + rate.NullRateLimiter(), ) lis, err := net.Listen("tcp", "127.0.0.1:0") diff --git a/agent/grpc-internal/stats_test.go b/agent/grpc-internal/stats_test.go index 2672beba46..13f71b79b6 100644 --- a/agent/grpc-internal/stats_test.go +++ b/agent/grpc-internal/stats_test.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/go-hclog" - middleware "github.com/hashicorp/consul/agent/grpc-middleware" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/grpc-middleware/testutil" "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/proto/prototest" @@ -26,7 +26,7 @@ func TestHandler_EmitsStats(t *testing.T) { sink, metricsObj := testutil.NewFakeSink(t) addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} - handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj, middleware.NullRateLimiter()) + handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj, rate.NullRateLimiter()) testservice.RegisterSimpleServer(handler.srv, &testservice.Simple{}) diff --git a/agent/grpc-middleware/mock_RateLimiter.go b/agent/grpc-middleware/mock_RateLimiter.go deleted file mode 100644 index 9f427b7bc6..0000000000 --- a/agent/grpc-middleware/mock_RateLimiter.go +++ /dev/null @@ -1,39 +0,0 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. - -package middleware - -import ( - testing "testing" - - rate "github.com/hashicorp/consul/agent/consul/rate" - mock "github.com/stretchr/testify/mock" -) - -// MockRateLimiter is an autogenerated mock type for the RateLimiter type -type MockRateLimiter struct { - mock.Mock -} - -// Allow provides a mock function with given fields: _a0 -func (_m *MockRateLimiter) Allow(_a0 rate.Operation) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(rate.Operation) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// NewMockRateLimiter creates a new instance of MockRateLimiter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRateLimiter(t testing.TB) *MockRateLimiter { - mock := &MockRateLimiter{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/agent/grpc-middleware/rate.go b/agent/grpc-middleware/rate.go index 5683d69404..d2254000f9 100644 --- a/agent/grpc-middleware/rate.go +++ b/agent/grpc-middleware/rate.go @@ -17,7 +17,7 @@ import ( // ServerRateLimiterMiddleware implements a ServerInHandle function to perform // RPC rate limiting at the cheapest possible point (before the full request has // been decoded). -func ServerRateLimiterMiddleware(limiter RateLimiter, panicHandler recovery.RecoveryHandlerFunc) tap.ServerInHandle { +func ServerRateLimiterMiddleware(limiter rate.RequestLimitsHandler, panicHandler recovery.RecoveryHandlerFunc) tap.ServerInHandle { return func(ctx context.Context, info *tap.Info) (_ context.Context, retErr error) { // This function is called before unary and stream RPC interceptors, so we // must handle our own panics here. @@ -56,17 +56,3 @@ func ServerRateLimiterMiddleware(limiter RateLimiter, panicHandler recovery.Reco } } } - -//go:generate mockery --name RateLimiter --inpackage -type RateLimiter interface { - Allow(rate.Operation) error -} - -// NullRateLimiter returns a RateLimiter that allows every operation. -func NullRateLimiter() RateLimiter { - return nullRateLimiter{} -} - -type nullRateLimiter struct{} - -func (nullRateLimiter) Allow(rate.Operation) error { return nil } diff --git a/agent/grpc-middleware/rate_test.go b/agent/grpc-middleware/rate_test.go index 1c5d417047..2d9c1eb3e5 100644 --- a/agent/grpc-middleware/rate_test.go +++ b/agent/grpc-middleware/rate_test.go @@ -21,7 +21,7 @@ import ( ) func TestServerRateLimiterMiddleware_Integration(t *testing.T) { - limiter := NewMockRateLimiter(t) + limiter := rate.NewMockRequestLimitsHandler(t) server := grpc.NewServer( grpc.InTapHandle(ServerRateLimiterMiddleware(limiter, NewPanicHandler(hclog.NewNullLogger()))), diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index c971ccf72c..029bfa2c50 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" external "github.com/hashicorp/consul/agent/grpc-external" @@ -1591,7 +1592,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta() deps := newDefaultDeps(t, conf) - externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, agentmiddleware.NullRateLimiter()) + externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRateLimiter()) server, err := consul.NewServer(conf, deps, externalGRPCServer) require.NoError(t, err) diff --git a/docs/config/checklist-adding-config-fields.md b/docs/config/checklist-adding-config-fields.md index 7a47eb8415..59737f7a4e 100644 --- a/docs/config/checklist-adding-config-fields.md +++ b/docs/config/checklist-adding-config-fields.md @@ -46,7 +46,7 @@ There are four specific cases covered with increasing complexity: - [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in `agent/config/runtime_test.go`. - [ ] If the config needs to be defaulted for the test server used in unit tests, - also add it to `DefaultConfig()` in `agent/consul/defaults.go`. + also add it to `DefaultConfig()` in `agent/consul/config.go`. - [ ] **If** your config should take effect on a reload/HUP. - [ ] Add necessary code to to trigger a safe (locked or atomic) update to any state the feature needs changing. This needs to be added to one or diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index 31809605af..f562d1605e 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -541,6 +541,10 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `http_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single client IP address is allowed to open to the agent's HTTP(S) server. This affects the HTTP(S) servers in both client and server agents. Default value is `200`. - `https_handshake_timeout` - Configures the limit for how long the HTTPS server in both client and server agents will wait for a client to complete a TLS handshake. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). Default value is `5s`. + - `request_limits` - This object povides configuration for rate limiting RPC and gRPC requests on the consul server. As a result of rate limiting gRPC and RPC request, HTTP requests to the Consul server are rate limited. + - `mode` - Configures whether rate limiting is enabled or not as well as how it behaves through the use of 3 possible modes. The default value of "disabled" will prevent any rate limiting from occuring. A value of "permissive" will cause the system to track requests against the `read_rate` and `write_rate` but will only log violations and will not block and will allow the request to continue processing. A value of "enforcing" also tracks requests against the `read_rate` and `write_rate` but in addition to logging violations, the system will block the request from processings by returning an error. + - `read_rate` - Configures how frequently RPC, gRPC, and HTTP queries are allowed to happen. The rate limiter limits the rate to tokens per second equal to this value. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. + - `write_rate` - Configures how frequently RPC, gRPC, and HTTP write are allowed to happen. The rate limiter limits the rate to tokens per second equal to this value. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. - `rpc_handshake_timeout` - Configures the limit for how long servers will wait after a client TCP connection is established before they complete the connection handshake. When TLS is used, the same timeout applies to the TLS handshake separately from the initial protocol negotiation. All Consul clients should perform this immediately on establishing a new connection. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). When `verify_incoming` is true on servers, this limits how long the connection socket and associated goroutines will be held open before the client successfully authenticates. Default value is `5s`. - `rpc_client_timeout` - Configures the limit for how long a client is allowed to read from an RPC connection. This is used to set an upper bound for calls to eventually terminate so that RPC connections are not held indefinitely. Blocking queries can override this timeout. Default is `60s`. - `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.