server: add placeholder glue for rate limit handler (#15539)

Adds a no-op implementation of the rate-limit handler and exposes
it on the consul.Server struct.

It allows us to start working on the net/rpc and gRPC interceptors
and config (re)loading logic, without having to implement the full
handler up-front.

Co-authored-by: John Murret <john.murret@hashicorp.com>
Co-authored-by: Dhia Ayachi <dhia@hashicorp.com>
This commit is contained in:
Dan Upton 2022-12-13 11:41:54 +00:00 committed by GitHub
parent 06ce35d480
commit eef38c2199
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 168 additions and 2 deletions

View File

@ -3,11 +3,12 @@ package multilimiter
import ( import (
"bytes" "bytes"
"context" "context"
radix "github.com/hashicorp/go-immutable-radix"
"golang.org/x/time/rate"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
radix "github.com/hashicorp/go-immutable-radix"
"golang.org/x/time/rate"
) )
var _ RateLimiter = &MultiLimiter{} var _ RateLimiter = &MultiLimiter{}

View File

@ -0,0 +1,151 @@
// package rate implements server-side RPC rate limiting.
package rate
import (
"context"
"errors"
"net"
"sync/atomic"
"github.com/hashicorp/consul/agent/consul/multilimiter"
)
var (
// ErrRetryElsewhere indicates that the operation was not allowed because the
// rate limit was exhausted, but may succeed on a different server.
//
// Results in a RESOURCE_EXHAUSTED or "429 Too Many Requests" response.
ErrRetryElsewhere = errors.New("rate limit exceeded, try a different server")
// ErrRetryLater indicates that the operation was not allowed because the rate
// limit was exhausted, and trying a different server won't help (e.g. because
// the operation can only be performed on the leader).
//
// Results in an UNAVAILABLE or "503 Service Unavailable" response.
ErrRetryLater = errors.New("rate limit exceeded, try again later")
)
// Mode determines the action that will be taken when a rate limit has been
// exhausted (e.g. log and allow, or reject).
type Mode int
const (
// ModePermissive causes the handler to log the rate-limited operation but
// still allow it to proceed.
ModePermissive Mode = iota
// ModeEnforcing causes the handler to reject the rate-limted operation.
ModeEnforcing
)
// OperationType is the type of operation the client is attempting to perform.
type OperationType int
const (
// OperationTypeRead represents a read operation.
OperationTypeRead OperationType = iota
// OperationTypeWrite represents a write operation.
OperationTypeWrite
)
// Operation the client is attempting to perform.
type Operation struct {
// Name of the RPC endpoint (e.g. "Foo.Bar" for net/rpc and "/foo.service/Bar" for gRPC).
Name string
// SourceAddr is the client's (or forwarding server's) IP address.
SourceAddr net.Addr
// Type of operation to be performed (e.g. read or write).
Type OperationType
}
// Handler enforces rate limits for incoming RPCs.
type Handler struct {
cfg *atomic.Pointer[HandlerConfig]
delegate HandlerDelegate
limiter multilimiter.RateLimiter
}
type HandlerConfig struct {
multilimiter.Config
// GlobalMode configures the action that will be taken when a global rate-limit
// has been exhausted.
//
// Note: in the future there'll be a separate Mode for IP-based limits.
GlobalMode Mode
// GlobalWriteConfig configures the global rate limiter for write operations.
GlobalWriteConfig multilimiter.LimiterConfig
// GlobalReadConfig configures the global rate limiter for read operations.
GlobalReadConfig multilimiter.LimiterConfig
}
type HandlerDelegate interface {
// IsLeader is used to determine whether the operation is being performed
// against the cluster leader, such that if it can _only_ be performed by
// the leader (e.g. write operations) we don't tell clients to retry against
// a different server.
IsLeader() bool
}
// NewHandler creates a new RPC rate limit handler.
func NewHandler(cfg HandlerConfig, delegate HandlerDelegate) *Handler {
limiter := multilimiter.NewMultiLimiter(cfg.Config)
limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite)
limiter.UpdateConfig(cfg.GlobalReadConfig, globalRead)
h := &Handler{
cfg: new(atomic.Pointer[HandlerConfig]),
delegate: delegate,
limiter: limiter,
}
h.cfg.Store(&cfg)
return h
}
// Run the limiter cleanup routine until the given context is canceled.
//
// Note: this starts a goroutine.
func (h *Handler) Run(ctx context.Context) {
h.limiter.Run(ctx)
}
// Allow returns an error if the given operation is not allowed to proceed
// because of an exhausted rate-limit.
func (h *Handler) Allow(op Operation) error {
// TODO(NET-1383): actually implement the rate limiting logic.
//
// Example:
// if !h.limiter.Allow(globalWrite) {
// }
return nil
}
// TODO(NET-1379): call this on `consul reload`.
func (h *Handler) UpdateConfig(cfg HandlerConfig) {
h.cfg.Store(&cfg)
h.limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite)
h.limiter.UpdateConfig(cfg.GlobalReadConfig, globalRead)
}
var (
// globalWrite identifies the global rate limit applied to write operations.
globalWrite = globalLimit("global.write")
// globalRead identifies the global rate limit applied to read operations.
globalRead = globalLimit("global.read")
)
// globalLimit represents a limit that applies to all writes or reads.
type globalLimit []byte
// Key satisfies the multilimiter.LimitedEntity interface.
func (prefix globalLimit) Key() multilimiter.KeyType {
return multilimiter.Key(prefix, nil)
}

View File

@ -5,6 +5,7 @@ import (
"crypto/x509" "crypto/x509"
"errors" "errors"
"fmt" "fmt"
"github.com/hashicorp/consul/agent/consul/multilimiter"
"io" "io"
"net" "net"
"os" "os"
@ -33,6 +34,7 @@ import (
"github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
"github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/fsm"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/consul/usagemetrics"
@ -275,6 +277,9 @@ type Server struct {
grpcHandler connHandler grpcHandler connHandler
rpcServer *rpc.Server rpcServer *rpc.Server
// incomingRPCLimiter rate-limits incoming net/rpc and gRPC calls.
incomingRPCLimiter *rpcRate.Handler
// insecureRPCServer is a RPC server that is configure with // insecureRPCServer is a RPC server that is configure with
// IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign // IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign
// to request client certificates. At this point a client doesn't have // to request client certificates. At this point a client doesn't have
@ -462,6 +467,15 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
Logger: logger.Named("hcp_manager"), Logger: logger.Named("hcp_manager"),
}) })
// TODO(NET-1380, NET-1381): thread this into the net/rpc and gRPC interceptors.
s.incomingRPCLimiter = rpcRate.NewHandler(rpcRate.HandlerConfig{
// TODO(server-rate-limit): revisit those value based on the multilimiter final implementation
Config: multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second},
// TODO(NET-1379): pass in _real_ configuration.
GlobalMode: rpcRate.ModePermissive,
}, s)
s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
var recorder *middleware.RequestRecorder var recorder *middleware.RequestRecorder
if flat.NewRequestRecorderFunc != nil { if flat.NewRequestRecorderFunc != nil {
recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter) recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter)