server: add gRPC server for streaming events

Includes a stats handler and stream interceptor for grpc metrics.

Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
Daniel Nephin 2020-07-22 19:57:29 -04:00
parent d17a9577f8
commit 2257247095
6 changed files with 247 additions and 12 deletions

View File

@ -470,6 +470,9 @@ type Config struct {
// AutoEncrypt.Sign requests.
AutoEncryptAllowTLS bool
// TODO: godoc, set this value from Agent
EnableGRPCServer bool
// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
}

View File

@ -188,6 +188,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
conn = tls.Server(conn, s.tlsConfigurator.IncomingInsecureRPCConfig())
s.handleInsecureConn(conn)
case pool.RPCGRPC:
s.grpcHandler.Handle(conn)
default:
if !s.handleEnterpriseRPCConn(typ, conn, isTLS) {
s.rpcLogger().Error("unrecognized RPC byte",
@ -254,6 +257,9 @@ func (s *Server) handleNativeTLS(conn net.Conn) {
case pool.ALPN_RPCSnapshot:
s.handleSnapshotConn(tlsConn)
case pool.ALPN_RPCGRPC:
s.grpcHandler.Handle(conn)
case pool.ALPN_WANGossipPacket:
if err := s.handleALPN_WANGossipPacketStream(tlsConn); err != nil && err != io.EOF {
s.rpcLogger().Error(

View File

@ -26,6 +26,7 @@ import (
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
@ -239,8 +240,9 @@ type Server struct {
rpcConnLimiter connlimit.Limiter
// Listener is used to listen for incoming connections
Listener net.Listener
rpcServer *rpc.Server
Listener net.Listener
grpcHandler connHandler
rpcServer *rpc.Server
// insecureRPCServer is a RPC server that is configure with
// IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign
@ -314,6 +316,12 @@ type Server struct {
EnterpriseServer
}
type connHandler interface {
Run() error
Handle(conn net.Conn)
Shutdown() error
}
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
@ -603,6 +611,8 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
}
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
s.grpcHandler = newGRPCHandlerFromConfig(logger, config)
// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
@ -612,6 +622,11 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
go s.monitorLeadership()
// Start listening for RPC requests.
go func() {
if err := s.grpcHandler.Run(); err != nil {
s.logger.Error("gRPC server failed", "error", err)
}
}()
go s.listen(s.Listener)
// Start listeners for any segments with separate RPC listeners.
@ -625,6 +640,14 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
return s, nil
}
func newGRPCHandlerFromConfig(logger hclog.Logger, config *Config) connHandler {
if !config.EnableGRPCServer {
return grpc.NoOpHandler{Logger: logger}
}
return grpc.NewHandler(config.RPCAddr)
}
func (s *Server) connectCARootsMonitor(ctx context.Context) {
for {
ws := memdb.NewWatchSet()
@ -949,6 +972,12 @@ func (s *Server) Shutdown() error {
s.Listener.Close()
}
if s.grpcHandler != nil {
if err := s.grpcHandler.Shutdown(); err != nil {
s.logger.Warn("failed to stop gRPC server", "error", err)
}
}
// Close the connection pool
if s.connPool != nil {
s.connPool.Shutdown()

113
agent/grpc/handler.go Normal file
View File

@ -0,0 +1,113 @@
/*
Package grpc provides a Handler and client for agent gRPC connections.
*/
package grpc
import (
"fmt"
"net"
"google.golang.org/grpc"
)
// NewHandler returns a Handler for addr.
func NewHandler(addr net.Addr) *Handler {
conns := make(chan net.Conn)
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer(
grpc.StatsHandler(&statsHandler{}),
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
)
// TODO(streaming): add gRPC services to srv here
return &Handler{
conns: conns,
srv: srv,
listener: &chanListener{addr: addr, conns: conns},
}
}
// Handler implements a handler for the rpc server listener, and the
// agent.Component interface for managing the lifecycle of the grpc.Server.
type Handler struct {
conns chan net.Conn
srv *grpc.Server
listener *chanListener
}
// Handle the connection by sending it to a channel for the grpc.Server to receive.
func (h *Handler) Handle(conn net.Conn) {
h.conns <- conn
}
func (h *Handler) Run() error {
return h.srv.Serve(h.listener)
}
func (h *Handler) Shutdown() error {
h.srv.Stop()
return nil
}
// chanListener implements net.Listener for grpc.Server.
type chanListener struct {
conns chan net.Conn
addr net.Addr
}
// Accept blocks until a connection is received from Handle, and then returns the
// connection. Accept implements part of the net.Listener interface for grpc.Server.
func (l *chanListener) Accept() (net.Conn, error) {
return <-l.conns, nil
}
func (l *chanListener) Addr() net.Addr {
return l.addr
}
// Close does nothing. The connections are managed by the caller.
func (l *chanListener) Close() error {
return nil
}
// NoOpHandler implements the same methods as Handler, but performs no handling.
// It may be used in place of Handler to disable the grpc server.
type NoOpHandler struct {
Logger Logger
}
type Logger interface {
Error(string, ...interface{})
}
func (h NoOpHandler) Handle(conn net.Conn) {
h.Logger.Error("gRPC conn opened but gRPC RPC is disabled, closing",
"conn", logConn(conn))
_ = conn.Close()
}
func (h NoOpHandler) Run() error {
return nil
}
func (h NoOpHandler) Shutdown() error {
return nil
}
// logConn is a local copy of github.com/hashicorp/memberlist.LogConn, to avoid
// a large dependency for a minor formatting function.
// logConn is used to keep log formatting consistent.
func logConn(conn net.Conn) string {
if conn == nil {
return "from=<unknown address>"
}
addr := conn.RemoteAddr()
if addr == nil {
return "from=<unknown address>"
}
return fmt.Sprintf("from=%s", addr.String())
}

82
agent/grpc/stats.go Normal file
View File

@ -0,0 +1,82 @@
package grpc
import (
"context"
"sync/atomic"
"github.com/armon/go-metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
)
// statsHandler is a grpc/stats.StatsHandler which emits connection and
// request metrics to go-metrics.
type statsHandler struct {
activeConns uint64 // must be 8-byte aligned for atomic access
}
// TagRPC implements grpcStats.StatsHandler
func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
// No-op
return ctx
}
// HandleRPC implements grpcStats.StatsHandler
func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
label := "server"
if s.IsClient() {
label = "client"
}
switch s.(type) {
case *stats.InHeader:
metrics.IncrCounter([]string{"grpc", label, "request"}, 1)
}
}
// TagConn implements grpcStats.StatsHandler
func (c *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
// No-op
return ctx
}
// HandleConn implements grpcStats.StatsHandler
func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) {
label := "server"
if s.IsClient() {
label = "client"
}
var count uint64
switch s.(type) {
case *stats.ConnBegin:
count = atomic.AddUint64(&c.activeConns, 1)
case *stats.ConnEnd:
// Decrement!
count = atomic.AddUint64(&c.activeConns, ^uint64(0))
}
metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count))
}
type activeStreamCounter struct {
// count of the number of open streaming RPCs on a server. It is accessed
// atomically.
count uint64
}
// GRPCCountingStreamInterceptor is a grpc.ServerStreamInterceptor that emits a
// a metric of the count of open streams.
func (i *activeStreamCounter) Intercept(
srv interface{},
ss grpc.ServerStream,
_ *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
count := atomic.AddUint64(&i.count, 1)
metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
defer func() {
count := atomic.AddUint64(&i.count, ^uint64(0))
metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
}()
return handler(srv, ss)
}

View File

@ -40,23 +40,24 @@ const (
// that is supported and it might be the only one there
// ever is.
RPCTLSInsecure = 7
RPCGRPC = 8
// RPCMaxTypeValue is the maximum rpc type byte value currently used for
// the various protocols riding over our "rpc" port.
// RPCMaxTypeValue is the maximum rpc type byte value currently used for the
// various protocols riding over our "rpc" port.
//
// Currently our 0-7 values are mutually exclusive with any valid first
// byte of a TLS header. The first TLS header byte will begin with a TLS
// content type and the values 0-19 are all explicitly unassigned and
// marked as requiring coordination. RFC 7983 does the marking and goes
// into some details about multiplexing connections and identifying TLS.
// Currently our 0-8 values are mutually exclusive with any valid first byte
// of a TLS header. The first TLS header byte will begin with a TLS content
// type and the values 0-19 are all explicitly unassigned and marked as
// requiring coordination. RFC 7983 does the marking and goes into some
// details about multiplexing connections and identifying TLS.
//
// We use this value to determine if the incoming request is actual real
// native TLS (where we can demultiplex based on ALPN protocol) or our
// older type-byte system when new connections are established.
// native TLS (where we can de-multiplex based on ALPN protocol) or our older
// type-byte system when new connections are established.
//
// NOTE: if you add new RPCTypes beyond this value, you must similarly bump
// this value.
RPCMaxTypeValue = 7
RPCMaxTypeValue = 8
)
const (
@ -66,6 +67,7 @@ const (
ALPN_RPCMultiplexV2 = "consul/rpc-multi" // RPCMultiplexV2
ALPN_RPCSnapshot = "consul/rpc-snapshot" // RPCSnapshot
ALPN_RPCGossip = "consul/rpc-gossip" // RPCGossip
ALPN_RPCGRPC = "consul/rpc-grpc" // RPCGRPC
// wan federation additions
ALPN_WANGossipPacket = "consul/wan-gossip/packet"
ALPN_WANGossipStream = "consul/wan-gossip/stream"