mirror of
https://github.com/status-im/consul.git
synced 2025-01-21 19:20:41 +00:00
d4c435856b
Adds automation for generating the map of `gRPC Method Name → Rate Limit Type` used by the middleware introduced in #15550, and will ensure we don't forget to add new endpoints. Engineers must annotate their RPCs in the proto file like so: ``` rpc Foo(FooRequest) returns (FooResponse) { option (consul.internal.ratelimit.spec) = { operation_type: READ, }; } ``` When they run `make proto` a protoc plugin `protoc-gen-consul-rate-limit` will be installed that writes rate-limit specs as a JSON array to a file called `.ratelimit.tmp` (one per protobuf package/directory). After running Buf, `make proto` will execute a post-process script that will ingest all of the `.ratelimit.tmp` files and generate a Go file containing the mappings in the `agent/grpc-middleware` package. In the enterprise repository, it will write an additional file with the enterprise-only endpoints. If an engineer forgets to add the annotation to a new RPC, the plugin will return an error like so: ``` RPC Foo is missing rate-limit specification, fix it with: import "proto-public/annotations/ratelimit/ratelimit.proto"; service Bar { rpc Foo(...) returns (...) { option (hashicorp.consul.internal.ratelimit.spec) = { operation_type: OPERATION_READ | OPERATION_WRITE | OPERATION_EXEMPT, }; } } ``` In the future, this annotation can be extended to support rate-limit category (e.g. KV vs Catalog) and to determine the retry policy.
156 lines
4.2 KiB
Go
156 lines
4.2 KiB
Go
package internal
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
|
|
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
|
|
|
|
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
|
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
|
"github.com/hashicorp/consul/agent/consul/rate"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
var (
|
|
metricsLabels = []metrics.Label{{
|
|
Name: "server_type",
|
|
Value: "internal",
|
|
}}
|
|
)
|
|
|
|
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
|
|
// The register function will be called with the grpc.Server to register
|
|
// gRPC services with the server.
|
|
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter rate.RequestLimitsHandler) *Handler {
|
|
if metricsObj == nil {
|
|
metricsObj = metrics.Default()
|
|
}
|
|
|
|
// We don't need to pass tls.Config to the server since it's multiplexed
|
|
// behind the RPC listener, which already has TLS configured.
|
|
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)
|
|
|
|
opts := []grpc.ServerOption{
|
|
grpc.InTapHandle(agentmiddleware.ServerRateLimiterMiddleware(rateLimiter, agentmiddleware.NewPanicHandler(logger), logger)),
|
|
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)),
|
|
middleware.WithUnaryServerChain(
|
|
// Add middlware interceptors to recover in case of panics.
|
|
recovery.UnaryServerInterceptor(recoveryOpts...),
|
|
),
|
|
middleware.WithStreamServerChain(
|
|
// Add middlware interceptors to recover in case of panics.
|
|
recovery.StreamServerInterceptor(recoveryOpts...),
|
|
agentmiddleware.NewActiveStreamCounter(metricsObj, metricsLabels).Intercept,
|
|
),
|
|
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
|
MinTime: 15 * time.Second,
|
|
}),
|
|
}
|
|
|
|
// We don't need to pass tls.Config to the server since it's multiplexed
|
|
// behind the RPC listener, which already has TLS configured.
|
|
srv := grpc.NewServer(opts...)
|
|
register(srv)
|
|
|
|
lis := &chanListener{addr: addr, conns: make(chan net.Conn), done: make(chan struct{})}
|
|
return &Handler{srv: srv, listener: lis}
|
|
}
|
|
|
|
// Handler implements a handler for the rpc server listener, and the
|
|
// agent.Component interface for managing the lifecycle of the grpc.Server.
|
|
type Handler struct {
|
|
srv *grpc.Server
|
|
listener *chanListener
|
|
}
|
|
|
|
// Handle the connection by sending it to a channel for the grpc.Server to receive.
|
|
func (h *Handler) Handle(conn net.Conn) {
|
|
h.listener.conns <- conn
|
|
}
|
|
|
|
func (h *Handler) Run() error {
|
|
return h.srv.Serve(h.listener)
|
|
}
|
|
|
|
func (h *Handler) Shutdown() error {
|
|
h.srv.Stop()
|
|
return nil
|
|
}
|
|
|
|
// chanListener implements net.Listener for grpc.Server.
|
|
type chanListener struct {
|
|
conns chan net.Conn
|
|
addr net.Addr
|
|
done chan struct{}
|
|
}
|
|
|
|
// Accept blocks until a connection is received from Handle, and then returns the
|
|
// connection. Accept implements part of the net.Listener interface for grpc.Server.
|
|
func (l *chanListener) Accept() (net.Conn, error) {
|
|
select {
|
|
case c := <-l.conns:
|
|
return c, nil
|
|
case <-l.done:
|
|
return nil, &net.OpError{
|
|
Op: "accept",
|
|
Net: l.addr.Network(),
|
|
Addr: l.addr,
|
|
Err: fmt.Errorf("listener closed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *chanListener) Addr() net.Addr {
|
|
return l.addr
|
|
}
|
|
|
|
func (l *chanListener) Close() error {
|
|
close(l.done)
|
|
return nil
|
|
}
|
|
|
|
// NoOpHandler implements the same methods as Handler, but performs no handling.
|
|
// It may be used in place of Handler to disable the grpc server.
|
|
type NoOpHandler struct {
|
|
Logger Logger
|
|
}
|
|
|
|
type Logger interface {
|
|
Error(string, ...interface{})
|
|
Warn(string, ...interface{})
|
|
}
|
|
|
|
func (h NoOpHandler) Handle(conn net.Conn) {
|
|
h.Logger.Error("gRPC conn opened but gRPC RPC is disabled, closing",
|
|
"conn", logConn(conn))
|
|
_ = conn.Close()
|
|
}
|
|
|
|
func (h NoOpHandler) Run() error {
|
|
return nil
|
|
}
|
|
|
|
func (h NoOpHandler) Shutdown() error {
|
|
return nil
|
|
}
|
|
|
|
// logConn is a local copy of github.com/hashicorp/memberlist.LogConn, to avoid
|
|
// a large dependency for a minor formatting function.
|
|
// logConn is used to keep log formatting consistent.
|
|
func logConn(conn net.Conn) string {
|
|
if conn == nil {
|
|
return "from=<unknown address>"
|
|
}
|
|
addr := conn.RemoteAddr()
|
|
if addr == nil {
|
|
return "from=<unknown address>"
|
|
}
|
|
|
|
return fmt.Sprintf("from=%s", addr.String())
|
|
}
|