diff --git a/agent/agent.go b/agent/agent.go index 3fa70b77cb..d8ce397363 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -722,6 +722,14 @@ func (a *Agent) consulConfig() (*consul.Config, error) { base.RPCAdvertise = base.RPCAddr } + // Rate limiting for RPC calls. + if a.config.Limits.RPCRate > 0 { + base.RPCRate = a.config.Limits.RPCRate + } + if a.config.Limits.RPCMaxBurst > 0 { + base.RPCMaxBurst = a.config.Limits.RPCMaxBurst + } + // set the src address for outgoing rpc connections // Use port 0 so that outgoing connections use a random port. if !ipaddr.IsAny(base.RPCAddr.IP) { diff --git a/agent/config.go b/agent/config.go index 8454baa852..e7c6716e31 100644 --- a/agent/config.go +++ b/agent/config.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-sockaddr/template" "github.com/mitchellh/mapstructure" + "golang.org/x/time/rate" ) const ( @@ -201,6 +202,20 @@ type RetryJoinAzure struct { SecretAccessKey string `mapstructure:"secret_access_key" json:"-"` } +// Limits is used to configure limits enforced by the agent. +type Limits struct { + // RPCRate and RPCMaxBurst control how frequently RPC calls are allowed + // to happen. In any large enough time interval, rate limiter limits the + // rate to RPCRate tokens per second, with a maximum burst size of + // RPCMaxBurst events. As a special case, if RPCRate == Inf (the infinite + // rate), RPCMaxBurst is ignored. + // + // See https://en.wikipedia.org/wiki/Token_bucket for more about token + // buckets. + RPCRate rate.Limit `mapstructure:"rpc_rate"` + RPCMaxBurst int `mapstructure:"rpc_max_burst"` +} + // Performance is used to tune the performance of Consul's subsystems. type Performance struct { // RaftMultiplier is an integer multiplier used to scale Raft timing @@ -378,6 +393,9 @@ type Config struct { // server with minimal configuration. Useful for developing Consul. DevMode bool `mapstructure:"-"` + // Limits is used to configure limits enforced by the agent. + Limits Limits `mapstructure:"limits"` + // Performance is used to tune the performance of Consul's subsystems. Performance Performance `mapstructure:"performance"` @@ -956,6 +974,10 @@ type dirEnts []os.FileInfo // DefaultConfig is used to return a sane default configuration func DefaultConfig() *Config { return &Config{ + Limits: Limits{ + RPCRate: rate.Inf, + RPCMaxBurst: 1000, + }, Bootstrap: false, BootstrapExpect: 0, Server: false, @@ -1639,6 +1661,13 @@ func DecodeCheckDefinition(raw interface{}) (*structs.CheckDefinition, error) { func MergeConfig(a, b *Config) *Config { var result Config = *a + if b.Limits.RPCRate > 0 { + result.Limits.RPCRate = b.Limits.RPCRate + } + if b.Limits.RPCMaxBurst > 0 { + result.Limits.RPCMaxBurst = b.Limits.RPCMaxBurst + } + // Propagate non-default performance settings if b.Performance.RaftMultiplier > 0 { result.Performance.RaftMultiplier = b.Performance.RaftMultiplier diff --git a/agent/config_test.go b/agent/config_test.go index 5099c20073..35a17dc1bb 100644 --- a/agent/config_test.go +++ b/agent/config_test.go @@ -417,6 +417,10 @@ func TestDecodeConfig(t *testing.T) { in: `{"leave_on_terminate":true}`, c: &Config{LeaveOnTerm: Bool(true)}, }, + { + in: `{"limits": {"rpc_rate": 100, "rpc_max_burst": 50}}}`, + c: &Config{Limits: Limits{RPCRate: 100, RPCMaxBurst: 50}}, + }, { in: `{"log_level":"a"}`, c: &Config{LogLevel: "a"}, @@ -1359,6 +1363,10 @@ func TestMergeConfig(t *testing.T) { } b := &Config{ + Limits: Limits{ + RPCRate: 100, + RPCMaxBurst: 50, + }, Performance: Performance{ RaftMultiplier: 99, }, diff --git a/agent/consul/client.go b/agent/consul/client.go index 48b3a57232..35badda4a6 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -9,11 +9,13 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/serf" + "golang.org/x/time/rate" ) const ( @@ -52,6 +54,10 @@ type Client struct { // Consul servers this agent uses for RPC requests routers *router.Manager + // rpcLimiter is used to rate limit the total number of RPCs initiated + // from an agent. + rpcLimiter *rate.Limiter + // eventCh is used to receive events from the // serf cluster in the datacenter eventCh chan serf.Event @@ -115,10 +121,11 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) { ForceTLS: config.VerifyOutgoing, } - // Create server + // Create client c := &Client{ config: config, connPool: connPool, + rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst), eventCh: make(chan serf.Event, serfEventBacklog), logger: logger, shutdownCh: make(chan struct{}), @@ -226,7 +233,14 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return structs.ErrNoServers } - // Forward to remote Consul + // Enforce the RPC limit. + metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) + if !c.rpcLimiter.Allow() { + metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) + return structs.ErrRPCRateExceeded + } + + // Make the request. if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil { c.routers.NotifyFailedServer(server) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) @@ -241,13 +255,18 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // operation. func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error { - - // Locate a server to make the request to. server := c.routers.FindServer() if server == nil { return structs.ErrNoServers } + // Enforce the RPC limit. + metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) + if !c.rpcLimiter.Allow() { + metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) + return structs.ErrRPCRateExceeded + } + // Request the operation. var reply structs.SnapshotResponse snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, server.UseTLS, args, in, &reply) diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index ba29912750..684ced39ef 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -333,6 +333,36 @@ func TestClient_RPC_TLS(t *testing.T) { }) } +func TestClient_RPC_RateLimit(t *testing.T) { + t.Parallel() + dir1, conf1 := testServerConfig(t) + s1, err := NewServer(conf1) + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + dir2, conf2 := testClientConfig(t) + conf2.RPCRate = 2 + conf2.RPCMaxBurst = 2 + c1, err := NewClient(conf2) + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + joinLAN(t, c1, s1) + retry.Run(t, func(r *retry.R) { + var out struct{} + if err := c1.RPC("Status.Ping", struct{}{}, &out); err != structs.ErrRPCRateExceeded { + r.Fatalf("err: %v", err) + } + }) +} + func TestClient_SnapshotRPC(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) @@ -348,9 +378,6 @@ func TestClient_SnapshotRPC(t *testing.T) { // Try to join. joinLAN(t, c1, s1) - if len(s1.LANMembers()) != 2 || len(c1.LANMembers()) != 2 { - t.Fatalf("Server has %v of %v expected members; Client has %v of %v expected members.", len(s1.LANMembers()), 2, len(c1.LANMembers()), 2) - } // Wait until we've got a healthy server. retry.Run(t, func(r *retry.R) { @@ -376,6 +403,42 @@ func TestClient_SnapshotRPC(t *testing.T) { } } +func TestClient_SnapshotRPC_RateLimit(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + dir2, conf1 := testClientConfig(t) + conf1.RPCRate = 2 + conf1.RPCMaxBurst = 2 + c1, err := NewClient(conf1) + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + joinLAN(t, c1, s1) + retry.Run(t, func(r *retry.R) { + if got, want := c1.routers.NumServers(), 1; got != want { + r.Fatalf("got %d servers want %d", got, want) + } + }) + + retry.Run(t, func(r *retry.R) { + var snap bytes.Buffer + args := structs.SnapshotRequest{ + Datacenter: "dc1", + Op: structs.SnapshotSave, + } + if err := c1.SnapshotRPC(&args, bytes.NewReader([]byte("")), &snap, nil); err != structs.ErrRPCRateExceeded { + r.Fatalf("err: %v", err) + } + }) +} + func TestClient_SnapshotRPC_TLS(t *testing.T) { t.Parallel() dir1, conf1 := testServerConfig(t) diff --git a/agent/consul/config.go b/agent/consul/config.go index 030b7c488e..f018473105 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "golang.org/x/time/rate" ) const ( @@ -312,6 +313,17 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration + // RPCRate and RPCMaxBurst control how frequently RPC calls are allowed + // to happen. In any large enough time interval, rate limiter limits the + // rate to RPCRate tokens per second, with a maximum burst size of + // RPCMaxBurst events. As a special case, if RPCRate == Inf (the infinite + // rate), RPCMaxBurst is ignored. + // + // See https://en.wikipedia.org/wiki/Token_bucket for more about token + // buckets. + RPCRate rate.Limit + RPCMaxBurst int + // AutopilotConfig is used to apply the initial autopilot config when // bootstrapping. AutopilotConfig *structs.AutopilotConfig @@ -395,6 +407,9 @@ func DefaultConfig() *Config { // than enough when running in the high performance mode. RPCHoldTimeout: 7 * time.Second, + RPCRate: rate.Inf, + RPCMaxBurst: 1000, + TLSMinVersion: "tls10", AutopilotConfig: &structs.AutopilotConfig{ diff --git a/agent/http.go b/agent/http.go index 0bf5dffeb6..10527ea86d 100644 --- a/agent/http.go +++ b/agent/http.go @@ -236,6 +236,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque case acl.IsErrPermissionDenied(err) || acl.IsErrNotFound(err): resp.WriteHeader(http.StatusForbidden) fmt.Fprint(resp, err.Error()) + case structs.IsErrRPCRateExceeded(err): + resp.WriteHeader(http.StatusTooManyRequests) + fmt.Fprint(resp, err.Error()) default: resp.WriteHeader(http.StatusInternalServerError) fmt.Fprint(resp, err.Error()) diff --git a/agent/structs/errors.go b/agent/structs/errors.go new file mode 100644 index 0000000000..fcf6dafe92 --- /dev/null +++ b/agent/structs/errors.go @@ -0,0 +1,28 @@ +package structs + +import ( + "errors" + "strings" +) + +const ( + errNoLeader = "No cluster leader" + errNoDCPath = "No path to datacenter" + errNoServers = "No known Consul servers" + errNotReadyForConsistentReads = "Not ready to serve consistent reads" + errSegmentsNotSupported = "Network segments are not supported in this version of Consul" + errRPCRateExceeded = "RPC rate limit exceeded" +) + +var ( + ErrNoLeader = errors.New(errNoLeader) + ErrNoDCPath = errors.New(errNoDCPath) + ErrNoServers = errors.New(errNoServers) + ErrNotReadyForConsistentReads = errors.New(errNotReadyForConsistentReads) + ErrSegmentsNotSupported = errors.New(errSegmentsNotSupported) + ErrRPCRateExceeded = errors.New(errRPCRateExceeded) +) + +func IsErrRPCRateExceeded(err error) bool { + return strings.Contains(err.Error(), errRPCRateExceeded) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e248302cdf..146ecb7e13 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -15,14 +15,6 @@ import ( "github.com/hashicorp/serf/coordinate" ) -var ( - ErrNoLeader = fmt.Errorf("No cluster leader") - ErrNoDCPath = fmt.Errorf("No path to datacenter") - ErrNoServers = fmt.Errorf("No known Consul servers") - ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads") - ErrSegmentsNotSupported = fmt.Errorf("Network segments are not supported in this version of Consul") -) - type MessageType uint8 // RaftIndex is used to track the index used while creating diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 0000000000..733099041f --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 0000000000..eabcd11474 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,380 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// contextContext is a temporary(?) copy of the context.Context type +// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+ +// with the built-in context package. If people ever stop using Go 1.6 +// we can remove this. +type contextContext interface { + Deadline() (deadline time.Time, ok bool) + Done() <-chan struct{} + Err() error + Value(key interface{}) interface{} +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) wait(ctx contextContext) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { + if n > lim.burst && lim.limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait + t := time.NewTimer(r.DelayFrom(now)) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + return d.Seconds() * float64(limit) +} diff --git a/vendor/golang.org/x/time/rate/rate_go16.go b/vendor/golang.org/x/time/rate/rate_go16.go new file mode 100644 index 0000000000..6bab1850f8 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go16.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.7 + +package rate + +import "golang.org/x/net/context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/golang.org/x/time/rate/rate_go17.go b/vendor/golang.org/x/time/rate/rate_go17.go new file mode 100644 index 0000000000..f90d85f51e --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go17.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package rate + +import "context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 36a9232187..41ba07ac48 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -83,7 +83,8 @@ {"checksumSHA1":"t24KnvC9jRxiANVhpw2pqFpmEu8=","path":"github.com/tonnerre/golang-text","revision":"048ed3d792f7104850acbc8cfc01e5a6070f4c04","revisionTime":"2013-09-25T19:58:46Z"}, {"checksumSHA1":"9jjO5GjLa0XF/nfWihF02RoH4qc=","path":"golang.org/x/net/context","revision":"075e191f18186a8ff2becaf64478e30f4545cdad","revisionTime":"2016-08-05T06:12:51Z"}, {"checksumSHA1":"WHc3uByvGaMcnSoI21fhzYgbOgg=","path":"golang.org/x/net/context/ctxhttp","revision":"075e191f18186a8ff2becaf64478e30f4545cdad","revisionTime":"2016-08-05T06:12:51Z"}, - {"checksumSHA1":"vlicYp+fe4ECQ+5QqpAk36VRA3s=","path":"golang.org/x/sys/unix","revision":"cd2c276457edda6df7fb04895d3fd6a6add42926","revisionTime":"2017-07-17T10:05:24Z"} + {"checksumSHA1":"vlicYp+fe4ECQ+5QqpAk36VRA3s=","path":"golang.org/x/sys/unix","revision":"cd2c276457edda6df7fb04895d3fd6a6add42926","revisionTime":"2017-07-17T10:05:24Z"}, + {"checksumSHA1":"vGfePfr0+weQUeTM/71mu+LCFuE=","path":"golang.org/x/time/rate","revision":"8be79e1e0910c292df4e79c241bb7e8f7e725959","revisionTime":"2017-04-24T23:28:54Z"} ], "rootPath": "github.com/hashicorp/consul" } \ No newline at end of file diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index 42fbc0f0f1..0b9de7259d 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -942,6 +942,19 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass value was unconditionally set to `false`). On agents in client-mode, this defaults to `true` and for agents in server-mode, this defaults to `false`. +* `limits` Available in Consul 0.9.3 and later, this + is a nested object that configures limits that are enforced by the agent. Currently, this only + applies to agents in client mode, not Consul servers. The following parameters are available: + + * `rpc_rate` - Configures the RPC rate + limiter by setting the maximum request rate that this agent is allowed to make for RPC + requests to Consul servers, in requests per second. Defaults to infinite, which disables + rate limiting. + * `rpc_max_burst` - The size of the token + bucket used to recharge the RPC rate limiter. Defaults to 1000 tokens, and each token is + good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket + for more details about how token bucket rate limiters operate. + * `log_level` Equivalent to the [`-log-level` command-line flag](#_log_level). diff --git a/website/source/docs/agent/telemetry.html.md b/website/source/docs/agent/telemetry.html.md index 02bb3767ca..2164f889a2 100644 --- a/website/source/docs/agent/telemetry.html.md +++ b/website/source/docs/agent/telemetry.html.md @@ -62,6 +62,18 @@ These metrics are used to monitor the health of specific Consul agents. Unit Type + + `consul.client.rpc` + This increments whenever a Consul agent in client mode makes an RPC request to a Consul server. This gives a measure of how much a given agent is loading the Consul servers. Currently, this is only generated by agents in client mode, not Consul servers. + requests + counter + + + `consul.client.rpc.exceeded` + This increments whenever a Consul agent in client mode makes an RPC request to a Consul server gets rate limited by that agent's [`limits`](/docs/agent/options.html#limits) configuration. This gives an indication that there's an abusive application making too many requests on the agent, or that the rate limit needs to be increased. Currently, this only applies to agents in client mode, not Consul servers. + rejected requests + counter + `consul.runtime.num_goroutines` This tracks the number of running goroutines and is a general load pressure indicator. This may burst from time to time but should return to a steady state value. diff --git a/website/source/docs/guides/performance.html.md b/website/source/docs/guides/performance.html.md index a107a6c0fc..a4f8d9d2b1 100644 --- a/website/source/docs/guides/performance.html.md +++ b/website/source/docs/guides/performance.html.md @@ -88,6 +88,13 @@ respect them. [stale consistency mode](/api/index.html#consistency) available to allow reads to scale across all the servers and not just be forwarded to the leader. +* In Consul 0.9.3 and later, a new [`limits`](/docs/agent/options.html#limits) configuration is +available on Consul clients to limit the RPC request rate they are allowed to make against the +Consul servers. After hitting the limit, requests will start to return rate limit errors until +time has passed and more requests are allowed. Configuring this across the cluster can help with +enforcing a max desired application load level on the servers, and can help mitigate abusive +applications. + ## Memory Requirements Consul server agents operate on a working set of data comprised of key/value