From 729896707058a254241afcdfd3e239cadb7900db Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Tue, 22 Mar 2022 12:40:24 +0000 Subject: [PATCH] Restructure gRPC server setup (#12586) OSS sync of enterprise changes at 0b44395e --- agent/agent.go | 45 +++++++++++-------- agent/agent_endpoint.go | 2 +- agent/consul/client_test.go | 4 +- agent/consul/leader_connect_ca_test.go | 7 +-- agent/consul/leader_test.go | 5 ++- agent/consul/rpc_test.go | 2 +- agent/consul/server.go | 11 +++-- agent/consul/server_test.go | 3 +- agent/consul/subscribe_backend.go | 2 +- agent/consul/subscribe_backend_test.go | 4 +- agent/grpc/middleware/recovery.go | 35 +++++++++++++++ agent/grpc/{ => private}/client.go | 2 +- agent/grpc/{ => private}/client_test.go | 24 +++++----- agent/grpc/{ => private}/handler.go | 38 +++------------- agent/grpc/{ => private}/handler_test.go | 8 ++-- .../internal/testservice/simple.pb.binary.go | 2 +- .../internal/testservice/simple.pb.go | 41 ++++++++--------- .../internal/testservice/simple.proto | 2 +- agent/grpc/{ => private}/resolver/registry.go | 0 agent/grpc/{ => private}/resolver/resolver.go | 0 agent/grpc/{ => private}/server_test.go | 7 ++- .../private/services}/subscribe/logger.go | 0 .../private/services}/subscribe/subscribe.go | 0 .../services}/subscribe/subscribe_test.go | 2 +- agent/grpc/{ => private}/stats.go | 2 +- agent/grpc/{ => private}/stats_test.go | 4 +- agent/grpc/public/server.go | 34 ++++++++++++++ agent/pool/pool.go | 2 +- agent/setup.go | 6 +-- agent/submatview/store_integration_test.go | 2 +- agent/xds/server.go | 30 +------------ docs/rpc/streaming/README.md | 3 +- 32 files changed, 177 insertions(+), 152 deletions(-) create mode 100644 agent/grpc/middleware/recovery.go rename agent/grpc/{ => private}/client.go (99%) rename agent/grpc/{ => private}/client_test.go (94%) rename agent/grpc/{ => private}/handler.go (78%) rename agent/grpc/{ => private}/handler_test.go (89%) rename agent/grpc/{ => private}/internal/testservice/simple.pb.binary.go (91%) rename agent/grpc/{ => private}/internal/testservice/simple.pb.go (90%) rename agent/grpc/{ => private}/internal/testservice/simple.proto (99%) rename agent/grpc/{ => private}/resolver/registry.go (100%) rename agent/grpc/{ => private}/resolver/resolver.go (100%) rename agent/grpc/{ => private}/server_test.go (98%) rename agent/{rpc => grpc/private/services}/subscribe/logger.go (100%) rename agent/{rpc => grpc/private/services}/subscribe/subscribe.go (100%) rename agent/{rpc => grpc/private/services}/subscribe/subscribe_test.go (99%) rename agent/grpc/{ => private}/stats.go (99%) rename agent/grpc/{ => private}/stats_test.go (98%) create mode 100644 agent/grpc/public/server.go diff --git a/agent/agent.go b/agent/agent.go index 6e74f5062d..e103629c2b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -36,6 +36,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/dns" + publicgrpc "github.com/hashicorp/consul/agent/grpc/public" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/rpcclient/health" @@ -206,6 +207,10 @@ type Agent struct { // depending on the configuration delegate delegate + // publicGRPCServer is the gRPC server exposed on the dedicated gRPC port (as + // opposed to the multiplexed "server" port). + publicGRPCServer *grpc.Server + // state stores a local representation of the node, // services and checks. Used for anti-entropy. State *local.State @@ -335,10 +340,6 @@ type Agent struct { // the centrally configured proxy/service defaults. serviceManager *ServiceManager - // grpcServer is the server instance used currently to serve xDS API for - // Envoy. - grpcServer *grpc.Server - // tlsConfigurator is the central instance to provide a *tls.Config // based on the current consul configuration. tlsConfigurator *tlsutil.Configurator @@ -359,6 +360,9 @@ type Agent struct { // run by the Agent routineManager *routine.Manager + // xdsServer serves the XDS protocol for configuring Envoy proxies. + xdsServer *xds.Server + // enterpriseAgent embeds fields that we only access in consul-enterprise builds enterpriseAgent } @@ -493,6 +497,10 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err) } + // This needs to happen after the initial auto-config is loaded, because TLS + // can only be configured on the gRPC server at the point of creation. + a.buildPublicGRPCServer() + if err := a.startLicenseManager(ctx); err != nil { return err } @@ -530,7 +538,7 @@ func (a *Agent) Start(ctx context.Context) error { // Setup either the client or the server. if c.ServerMode { - server, err := consul.NewServer(consulCfg, a.baseDeps.Deps) + server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.publicGRPCServer) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } @@ -700,12 +708,21 @@ func (a *Agent) Failed() <-chan struct{} { return a.apiServers.failed } +func (a *Agent) buildPublicGRPCServer() { + // TLS is only enabled on the gRPC server if there's an HTTPS port configured. + var tls *tlsutil.Configurator + if a.config.HTTPSPort > 0 { + tls = a.tlsConfigurator + } + a.publicGRPCServer = publicgrpc.NewServer(a.logger.Named("grpc.public"), tls) +} + func (a *Agent) listenAndServeGRPC() error { if len(a.config.GRPCAddrs) < 1 { return nil } - xdsServer := xds.NewServer( + a.xdsServer = xds.NewServer( a.logger.Named(logging.Envoy), a.config.ConnectServerlessPluginEnabled, a.proxyConfig, @@ -715,15 +732,7 @@ func (a *Agent) listenAndServeGRPC() error { a, a, ) - - tlsConfig := a.tlsConfigurator - // gRPC uses the same TLS settings as the HTTPS API. If HTTPS is not enabled - // then gRPC should not use TLS. - if a.config.HTTPSPort <= 0 { - tlsConfig = nil - } - var err error - a.grpcServer = xds.NewGRPCServer(xdsServer, tlsConfig) + a.xdsServer.Register(a.publicGRPCServer) ln, err := a.startListeners(a.config.GRPCAddrs) if err != nil { @@ -736,7 +745,7 @@ func (a *Agent) listenAndServeGRPC() error { "address", innerL.Addr().String(), "network", innerL.Addr().Network(), ) - err := a.grpcServer.Serve(innerL) + err := a.publicGRPCServer.Serve(innerL) if err != nil { a.logger.Error("gRPC server failed", "error", err) } @@ -1403,9 +1412,7 @@ func (a *Agent) ShutdownAgent() error { } // Stop gRPC - if a.grpcServer != nil { - a.grpcServer.Stop() - } + a.publicGRPCServer.Stop() // Stop the proxy config manager if a.proxyConfig != nil { diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 781ccddc65..67158f87a6 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -73,7 +73,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i } var xds *XDSSelf - if s.agent.grpcServer != nil { + if s.agent.xdsServer != nil { xds = &XDSSelf{ SupportedProxies: map[string][]string{ "envoy": proxysupport.EnvoyVersions, diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index f077f5d5fb..06ef80efeb 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -17,8 +17,8 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" - "github.com/hashicorp/consul/agent/grpc" - "github.com/hashicorp/consul/agent/grpc/resolver" + grpc "github.com/hashicorp/consul/agent/grpc/private" + "github.com/hashicorp/consul/agent/grpc/private/resolver" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index 1b4eaf38fd..2ebbda0a67 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -16,12 +16,13 @@ import ( "testing" "time" - msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" - "github.com/hashicorp/consul-net-rpc/net/rpc" vaultapi "github.com/hashicorp/vault/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" + "github.com/hashicorp/consul-net-rpc/net/rpc" + "github.com/hashicorp/consul/agent/connect" ca "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/fsm" @@ -549,7 +550,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) { deps := newDefaultDeps(t, conf1) deps.Logger = logger - s1, err := NewServer(conf1, deps) + s1, err := NewServer(conf1, deps, nil) require.NoError(t, err) defer s1.Shutdown() testrpc.WaitForLeader(t, s1.RPC, "dc1") diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 0d17d60ece..189c058b9c 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -9,11 +9,12 @@ import ( "testing" "time" - msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/go-hclog" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" + msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" + "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -1527,7 +1528,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) { deps := newDefaultDeps(t, config) deps.Logger = logger - srv, err := NewServer(config, deps) + srv, err := NewServer(config, deps, nil) require.NoError(t, err) defer srv.Shutdown() diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 0f37d4b6fd..d8bb45241a 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -32,7 +32,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" - agent_grpc "github.com/hashicorp/consul/agent/grpc" + agent_grpc "github.com/hashicorp/consul/agent/grpc/private" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" diff --git a/agent/consul/server.go b/agent/consul/server.go index 96db9fa1ad..ba77c45171 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -41,11 +41,11 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/consul/wanfed" - agentgrpc "github.com/hashicorp/consul/agent/grpc" + agentgrpc "github.com/hashicorp/consul/agent/grpc/private" + "github.com/hashicorp/consul/agent/grpc/private/services/subscribe" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" - "github.com/hashicorp/consul/agent/rpc/subscribe" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/lib" @@ -235,6 +235,10 @@ type Server struct { // is only ever closed. leaveCh chan struct{} + // publicGRPCServer is the gRPC server exposed on the dedicated gRPC port, as + // opposed to the multiplexed "server" port which is served by grpcHandler. + publicGRPCServer *grpc.Server + // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. router *router.Router @@ -345,7 +349,7 @@ type connHandler interface { // NewServer is used to construct a new Consul server from the configuration // and extra options, potentially returning an error. -func NewServer(config *Config, flat Deps) (*Server, error) { +func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Server, error) { logger := flat.Logger if err := config.CheckProtocolVersion(); err != nil { return nil, err @@ -388,6 +392,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { rpcServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))), insecureRPCServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))), tlsConfigurator: flat.TLSConfigurator, + publicGRPCServer: publicGRPCServer, reassertLeaderCh: make(chan chan error), sessionTimers: NewSessionTimers(), tombstoneGC: gc, diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 90d6d64404..bf7ff0ab6b 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -67,7 +67,6 @@ func testTLSCertificates(serverName string) (cert string, key string, cacert str return cert, privateKey, ca, nil } -// testServerACLConfig setup some common ACL configurations. func testServerACLConfig(c *Config) { c.PrimaryDatacenter = "dc1" c.ACLsEnabled = true @@ -264,7 +263,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) { } } - srv, err := NewServer(c, newDefaultDeps(t, c)) + srv, err := NewServer(c, newDefaultDeps(t, c), nil) if err != nil { return nil, err } diff --git a/agent/consul/subscribe_backend.go b/agent/consul/subscribe_backend.go index 8b6eddd847..8dc2d3cb2c 100644 --- a/agent/consul/subscribe_backend.go +++ b/agent/consul/subscribe_backend.go @@ -5,7 +5,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" - "github.com/hashicorp/consul/agent/rpc/subscribe" + "github.com/hashicorp/consul/agent/grpc/private/services/subscribe" "github.com/hashicorp/consul/agent/structs" ) diff --git a/agent/consul/subscribe_backend_test.go b/agent/consul/subscribe_backend_test.go index 5235f0b96c..7a22eace5e 100644 --- a/agent/consul/subscribe_backend_test.go +++ b/agent/consul/subscribe_backend_test.go @@ -14,8 +14,8 @@ import ( "golang.org/x/sync/errgroup" gogrpc "google.golang.org/grpc" - grpc "github.com/hashicorp/consul/agent/grpc" - "github.com/hashicorp/consul/agent/grpc/resolver" + grpc "github.com/hashicorp/consul/agent/grpc/private" + "github.com/hashicorp/consul/agent/grpc/private/resolver" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbservice" diff --git a/agent/grpc/middleware/recovery.go b/agent/grpc/middleware/recovery.go new file mode 100644 index 0000000000..c376b8a153 --- /dev/null +++ b/agent/grpc/middleware/recovery.go @@ -0,0 +1,35 @@ +package middleware + +import ( + recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// PanicHandlerMiddlewareOpts returns the []recovery.Option containing +// recovery handler function. +func PanicHandlerMiddlewareOpts(logger Logger) []recovery.Option { + return []recovery.Option{ + recovery.WithRecoveryHandler(NewPanicHandler(logger)), + } +} + +// NewPanicHandler returns a recovery.RecoveryHandlerFunc closure function +// to handle panic in GRPC server's handlers. +func NewPanicHandler(logger Logger) recovery.RecoveryHandlerFunc { + return func(p interface{}) (err error) { + // Log the panic and the stack trace of the Goroutine that caused the panic. + stacktrace := hclog.Stacktrace() + logger.Error("panic serving grpc request", + "panic", p, + "stack", stacktrace, + ) + + return status.Errorf(codes.Internal, "grpc: panic serving request") + } +} + +type Logger interface { + Error(string, ...interface{}) +} diff --git a/agent/grpc/client.go b/agent/grpc/private/client.go similarity index 99% rename from agent/grpc/client.go rename to agent/grpc/private/client.go index 9afd6becd4..8d10edd175 100644 --- a/agent/grpc/client.go +++ b/agent/grpc/private/client.go @@ -1,4 +1,4 @@ -package grpc +package private import ( "context" diff --git a/agent/grpc/client_test.go b/agent/grpc/private/client_test.go similarity index 94% rename from agent/grpc/client_test.go rename to agent/grpc/private/client_test.go index 371452ff43..d414207abe 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/private/client_test.go @@ -1,4 +1,4 @@ -package grpc +package private import ( "context" @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/grpc/internal/testservice" - "github.com/hashicorp/consul/agent/grpc/resolver" + "github.com/hashicorp/consul/agent/grpc/private/internal/testservice" + "github.com/hashicorp/consul/agent/grpc/private/resolver" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/sdk/freeport" @@ -145,9 +145,9 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) { tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{ InternalRPC: tlsutil.ProtocolConfig{ VerifyIncoming: true, - CAFile: "../../test/hostname/CertAuth.crt", - CertFile: "../../test/hostname/Alice.crt", - KeyFile: "../../test/hostname/Alice.key", + CAFile: "../../../test/hostname/CertAuth.crt", + CertFile: "../../../test/hostname/Alice.crt", + KeyFile: "../../../test/hostname/Alice.key", VerifyOutgoing: true, }, }, hclog.New(nil)) @@ -192,9 +192,9 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T) tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{ InternalRPC: tlsutil.ProtocolConfig{ VerifyIncoming: true, - CAFile: "../../test/hostname/CertAuth.crt", - CertFile: "../../test/hostname/Bob.crt", - KeyFile: "../../test/hostname/Bob.key", + CAFile: "../../../test/hostname/CertAuth.crt", + CertFile: "../../../test/hostname/Bob.crt", + KeyFile: "../../../test/hostname/Bob.key", VerifyOutgoing: true, VerifyServerHostname: true, }, @@ -222,9 +222,9 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T) clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{ InternalRPC: tlsutil.ProtocolConfig{ VerifyIncoming: true, - CAFile: "../../test/hostname/CertAuth.crt", - CertFile: "../../test/hostname/Betty.crt", - KeyFile: "../../test/hostname/Betty.key", + CAFile: "../../../test/hostname/CertAuth.crt", + CertFile: "../../../test/hostname/Betty.crt", + KeyFile: "../../../test/hostname/Betty.key", VerifyOutgoing: true, VerifyServerHostname: true, }, diff --git a/agent/grpc/handler.go b/agent/grpc/private/handler.go similarity index 78% rename from agent/grpc/handler.go rename to agent/grpc/private/handler.go index 886186dd80..3cc103af28 100644 --- a/agent/grpc/handler.go +++ b/agent/grpc/private/handler.go @@ -1,21 +1,16 @@ -/* -Package grpc provides a Handler and client for agent gRPC connections. -*/ -package grpc +package private import ( "fmt" "net" "time" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/status" + agentmiddleware "github.com/hashicorp/consul/agent/grpc/middleware" middleware "github.com/grpc-ecosystem/go-grpc-middleware" recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" - "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) // NewHandler returns a gRPC server that accepts connections from Handle(conn). @@ -26,7 +21,7 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server) // We don't need to pass tls.Config to the server since it's multiplexed // behind the RPC listener, which already has TLS configured. - recoveryOpts := PanicHandlerMiddlewareOpts(logger) + recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger) opts := []grpc.ServerOption{ grpc.StatsHandler(newStatsHandler(metrics)), @@ -53,29 +48,6 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server) return &Handler{srv: srv, listener: lis} } -// PanicHandlerMiddlewareOpts returns the []recovery.Option containing -// recovery handler function. -func PanicHandlerMiddlewareOpts(logger Logger) []recovery.Option { - return []recovery.Option{ - recovery.WithRecoveryHandler(NewPanicHandler(logger)), - } -} - -// NewPanicHandler returns a recovery.RecoveryHandlerFunc closure function -// to handle panic in GRPC server's handlers. -func NewPanicHandler(logger Logger) recovery.RecoveryHandlerFunc { - return func(p interface{}) (err error) { - // Log the panic and the stack trace of the Goroutine that caused the panic. - stacktrace := hclog.Stacktrace() - logger.Error("panic serving grpc request", - "panic", p, - "stack", stacktrace, - ) - - return status.Errorf(codes.Internal, "grpc: panic serving request") - } -} - // Handler implements a handler for the rpc server listener, and the // agent.Component interface for managing the lifecycle of the grpc.Server. type Handler struct { diff --git a/agent/grpc/handler_test.go b/agent/grpc/private/handler_test.go similarity index 89% rename from agent/grpc/handler_test.go rename to agent/grpc/private/handler_test.go index 46faa9696f..bb1a7f4144 100644 --- a/agent/grpc/handler_test.go +++ b/agent/grpc/private/handler_test.go @@ -1,4 +1,4 @@ -package grpc +package private import ( "bytes" @@ -13,8 +13,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/hashicorp/consul/agent/grpc/internal/testservice" - "github.com/hashicorp/consul/agent/grpc/resolver" + "github.com/hashicorp/consul/agent/grpc/private/internal/testservice" + "github.com/hashicorp/consul/agent/grpc/private/resolver" ) func TestHandler_PanicRecoveryInterceptor(t *testing.T) { @@ -57,5 +57,5 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) { // Checking the entire stack trace is not possible, let's // make sure that it contains a couple of expected strings. require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`) - require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc.(*simplePanic).Something`) + require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc/private.(*simplePanic).Something`) } diff --git a/agent/grpc/internal/testservice/simple.pb.binary.go b/agent/grpc/private/internal/testservice/simple.pb.binary.go similarity index 91% rename from agent/grpc/internal/testservice/simple.pb.binary.go rename to agent/grpc/private/internal/testservice/simple.pb.binary.go index ef203aaa64..2d65084bd8 100644 --- a/agent/grpc/internal/testservice/simple.pb.binary.go +++ b/agent/grpc/private/internal/testservice/simple.pb.binary.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go-binary. DO NOT EDIT. -// source: agent/grpc/internal/testservice/simple.proto +// source: agent/grpc/private/internal/testservice/simple.proto package testservice diff --git a/agent/grpc/internal/testservice/simple.pb.go b/agent/grpc/private/internal/testservice/simple.pb.go similarity index 90% rename from agent/grpc/internal/testservice/simple.pb.go rename to agent/grpc/private/internal/testservice/simple.pb.go index d1e07d8128..dc2835664f 100644 --- a/agent/grpc/internal/testservice/simple.pb.go +++ b/agent/grpc/private/internal/testservice/simple.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: agent/grpc/internal/testservice/simple.proto +// source: agent/grpc/private/internal/testservice/simple.proto package testservice @@ -37,7 +37,7 @@ func (m *Req) Reset() { *m = Req{} } func (m *Req) String() string { return proto.CompactTextString(m) } func (*Req) ProtoMessage() {} func (*Req) Descriptor() ([]byte, []int) { - return fileDescriptor_3009a77c573f826d, []int{0} + return fileDescriptor_98af0751f806f450, []int{0} } func (m *Req) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -85,7 +85,7 @@ func (m *Resp) Reset() { *m = Resp{} } func (m *Resp) String() string { return proto.CompactTextString(m) } func (*Resp) ProtoMessage() {} func (*Resp) Descriptor() ([]byte, []int) { - return fileDescriptor_3009a77c573f826d, []int{1} + return fileDescriptor_98af0751f806f450, []int{1} } func (m *Resp) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -134,24 +134,25 @@ func init() { } func init() { - proto.RegisterFile("agent/grpc/internal/testservice/simple.proto", fileDescriptor_3009a77c573f826d) + proto.RegisterFile("agent/grpc/private/internal/testservice/simple.proto", fileDescriptor_98af0751f806f450) } -var fileDescriptor_3009a77c573f826d = []byte{ - // 206 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x49, 0x4c, 0x4f, 0xcd, - 0x2b, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, - 0x2f, 0x49, 0x2d, 0x2e, 0x29, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0xce, 0xcc, 0x2d, - 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0x92, 0x51, 0x52, 0xe5, 0x62, - 0x0e, 0x4a, 0x2d, 0x14, 0x92, 0xe3, 0xe2, 0x72, 0x49, 0x2c, 0x49, 0x4c, 0x4e, 0x05, 0xe9, 0x96, - 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, 0x51, 0x72, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e, - 0x00, 0xa9, 0x0b, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xf2, 0x4b, 0xcc, 0x4d, 0x85, 0xa9, 0x43, 0x88, - 0xa0, 0x99, 0xc3, 0x84, 0x6e, 0x8e, 0x51, 0x2e, 0x17, 0x5b, 0x30, 0xd8, 0x2d, 0x42, 0x46, 0x5c, - 0x9c, 0xc1, 0xf9, 0xb9, 0xa9, 0x25, 0x19, 0x99, 0x79, 0xe9, 0x42, 0x02, 0x7a, 0x48, 0x6e, 0xd2, - 0x0b, 0x4a, 0x2d, 0x94, 0x12, 0x44, 0x13, 0x29, 0x2e, 0x50, 0x62, 0x10, 0xd2, 0xe7, 0x62, 0x71, - 0xcb, 0xc9, 0x2f, 0x27, 0x52, 0xb9, 0x01, 0xa3, 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, - 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0x38, 0x0c, 0x8c, - 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x4b, 0x16, 0x40, 0x33, 0x01, 0x00, 0x00, +var fileDescriptor_98af0751f806f450 = []byte{ + // 214 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x49, 0x4c, 0x4f, 0xcd, + 0x2b, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2c, 0x4b, 0x2c, 0x49, 0xd5, 0xcf, + 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, 0x2f, 0x49, 0x2d, 0x2e, 0x29, 0x4e, 0x2d, 0x2a, + 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0xce, 0xcc, 0x2d, 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, + 0x17, 0xe2, 0x46, 0x92, 0x51, 0x52, 0xe5, 0x62, 0x0e, 0x4a, 0x2d, 0x14, 0x92, 0xe3, 0xe2, 0x72, + 0x49, 0x2c, 0x49, 0x4c, 0x4e, 0x05, 0xe9, 0x96, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, + 0x51, 0x72, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e, 0x00, 0xa9, 0x0b, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, + 0xf2, 0x4b, 0xcc, 0x4d, 0x85, 0xa9, 0x43, 0x88, 0xa0, 0x99, 0xc3, 0x84, 0x6e, 0x8e, 0x51, 0x2e, + 0x17, 0x5b, 0x30, 0xd8, 0x2d, 0x42, 0x46, 0x5c, 0x9c, 0xc1, 0xf9, 0xb9, 0xa9, 0x25, 0x19, 0x99, + 0x79, 0xe9, 0x42, 0x02, 0x7a, 0x48, 0x6e, 0xd2, 0x0b, 0x4a, 0x2d, 0x94, 0x12, 0x44, 0x13, 0x29, + 0x2e, 0x50, 0x62, 0x10, 0xd2, 0xe7, 0x62, 0x71, 0xcb, 0xc9, 0x2f, 0x27, 0x52, 0xb9, 0x01, 0xa3, + 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3, + 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0x38, 0x0c, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x76, 0xce, + 0x88, 0x7d, 0x3b, 0x01, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -295,7 +296,7 @@ var _Simple_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, }, }, - Metadata: "agent/grpc/internal/testservice/simple.proto", + Metadata: "agent/grpc/private/internal/testservice/simple.proto", } func (m *Req) Marshal() (dAtA []byte, err error) { diff --git a/agent/grpc/internal/testservice/simple.proto b/agent/grpc/private/internal/testservice/simple.proto similarity index 99% rename from agent/grpc/internal/testservice/simple.proto rename to agent/grpc/private/internal/testservice/simple.proto index 4077d9bcd1..9773df134c 100644 --- a/agent/grpc/internal/testservice/simple.proto +++ b/agent/grpc/private/internal/testservice/simple.proto @@ -15,4 +15,4 @@ message Req { message Resp { string ServerName = 1; string Datacenter = 2; -} \ No newline at end of file +} diff --git a/agent/grpc/resolver/registry.go b/agent/grpc/private/resolver/registry.go similarity index 100% rename from agent/grpc/resolver/registry.go rename to agent/grpc/private/resolver/registry.go diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/private/resolver/resolver.go similarity index 100% rename from agent/grpc/resolver/resolver.go rename to agent/grpc/private/resolver/resolver.go diff --git a/agent/grpc/server_test.go b/agent/grpc/private/server_test.go similarity index 98% rename from agent/grpc/server_test.go rename to agent/grpc/private/server_test.go index 043eb45003..c9873c2226 100644 --- a/agent/grpc/server_test.go +++ b/agent/grpc/private/server_test.go @@ -1,4 +1,4 @@ -package grpc +package private import ( "context" @@ -10,13 +10,12 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "github.com/hashicorp/go-hclog" - - "github.com/hashicorp/consul/agent/grpc/internal/testservice" + "github.com/hashicorp/consul/agent/grpc/private/internal/testservice" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/tlsutil" diff --git a/agent/rpc/subscribe/logger.go b/agent/grpc/private/services/subscribe/logger.go similarity index 100% rename from agent/rpc/subscribe/logger.go rename to agent/grpc/private/services/subscribe/logger.go diff --git a/agent/rpc/subscribe/subscribe.go b/agent/grpc/private/services/subscribe/subscribe.go similarity index 100% rename from agent/rpc/subscribe/subscribe.go rename to agent/grpc/private/services/subscribe/subscribe.go diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/grpc/private/services/subscribe/subscribe_test.go similarity index 99% rename from agent/rpc/subscribe/subscribe_test.go rename to agent/grpc/private/services/subscribe/subscribe_test.go index 0c928b4203..b11438d7e3 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/grpc/private/services/subscribe/subscribe_test.go @@ -21,7 +21,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" - "github.com/hashicorp/consul/agent/grpc" + grpc "github.com/hashicorp/consul/agent/grpc/private" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbcommon" diff --git a/agent/grpc/stats.go b/agent/grpc/private/stats.go similarity index 99% rename from agent/grpc/stats.go rename to agent/grpc/private/stats.go index 8af6ec4005..76293e8511 100644 --- a/agent/grpc/stats.go +++ b/agent/grpc/private/stats.go @@ -1,4 +1,4 @@ -package grpc +package private import ( "context" diff --git a/agent/grpc/stats_test.go b/agent/grpc/private/stats_test.go similarity index 98% rename from agent/grpc/stats_test.go rename to agent/grpc/private/stats_test.go index 079de34086..78e63647eb 100644 --- a/agent/grpc/stats_test.go +++ b/agent/grpc/private/stats_test.go @@ -1,4 +1,4 @@ -package grpc +package private import ( "context" @@ -14,7 +14,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "github.com/hashicorp/consul/agent/grpc/internal/testservice" + "github.com/hashicorp/consul/agent/grpc/private/internal/testservice" "github.com/hashicorp/go-hclog" ) diff --git a/agent/grpc/public/server.go b/agent/grpc/public/server.go new file mode 100644 index 0000000000..c235fbd092 --- /dev/null +++ b/agent/grpc/public/server.go @@ -0,0 +1,34 @@ +package public + +import ( + middleware "github.com/grpc-ecosystem/go-grpc-middleware" + recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + agentmiddleware "github.com/hashicorp/consul/agent/grpc/middleware" + "github.com/hashicorp/consul/tlsutil" +) + +// NewServer constructs a gRPC server for the public gRPC port, to which +// handlers can be registered. +func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.Server { + recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger) + + opts := []grpc.ServerOption{ + grpc.MaxConcurrentStreams(2048), + middleware.WithUnaryServerChain( + // Add middlware interceptors to recover in case of panics. + recovery.UnaryServerInterceptor(recoveryOpts...), + ), + middleware.WithStreamServerChain( + // Add middlware interceptors to recover in case of panics. + recovery.StreamServerInterceptor(recoveryOpts...), + ), + } + if tls != nil && tls.GRPCTLSConfigured() { + creds := credentials.NewTLS(tls.IncomingGRPCConfig()) + opts = append(opts, grpc.Creds(creds)) + } + return grpc.NewServer(opts...) +} diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 1f21850e30..179565dcf3 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -412,7 +412,7 @@ func DialRPCViaMeshGateway( } if nextProto != ALPN_RPCGRPC { - // agent/grpc/client.go:dial() handles this in another way for gRPC + // agent/grpc/private/client.go:dial() handles this in another way for gRPC if tcp, ok := rawConn.(*net.TCPConn); ok { _ = tcp.SetKeepAlive(true) _ = tcp.SetNoDelay(true) diff --git a/agent/setup.go b/agent/setup.go index 4b8452fc57..bf67c0360f 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -20,12 +20,11 @@ import ( "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/usagemetrics" - "github.com/hashicorp/consul/agent/grpc" - "github.com/hashicorp/consul/agent/grpc/resolver" + grpc "github.com/hashicorp/consul/agent/grpc/private" + "github.com/hashicorp/consul/agent/grpc/private/resolver" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" - "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/xds" @@ -265,7 +264,6 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau grpc.StatsCounters, local.StateCounters, raftCounters, - middleware.NewRPCCounters, } // Flatten definitions // NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique? diff --git a/agent/submatview/store_integration_test.go b/agent/submatview/store_integration_test.go index c8b6c21a8a..b6e6295438 100644 --- a/agent/submatview/store_integration_test.go +++ b/agent/submatview/store_integration_test.go @@ -22,7 +22,7 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" - "github.com/hashicorp/consul/agent/rpc/subscribe" + "github.com/hashicorp/consul/agent/grpc/private/services/subscribe" "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/submatview" diff --git a/agent/xds/server.go b/agent/xds/server.go index 86aa6ec73f..d385ac8638 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -7,24 +7,19 @@ import ( "time" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - middleware "github.com/grpc-ecosystem/go-grpc-middleware" - recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" - agentgrpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/xdscommon" - "github.com/hashicorp/consul/tlsutil" ) var StatsGauges = []prometheus.GaugeDefinition{ @@ -206,30 +201,9 @@ func tokenFromContext(ctx context.Context) string { return "" } -// NewGRPCServer creates a grpc.Server, registers the Server, and then returns -// the grpc.Server. -func NewGRPCServer(s *Server, tlsConfigurator *tlsutil.Configurator) *grpc.Server { - recoveryOpts := agentgrpc.PanicHandlerMiddlewareOpts(s.Logger) - - opts := []grpc.ServerOption{ - grpc.MaxConcurrentStreams(2048), - middleware.WithUnaryServerChain( - // Add middlware interceptors to recover in case of panics. - recovery.UnaryServerInterceptor(recoveryOpts...), - ), - middleware.WithStreamServerChain( - // Add middlware interceptors to recover in case of panics. - recovery.StreamServerInterceptor(recoveryOpts...), - ), - } - if tlsConfigurator != nil && tlsConfigurator.GRPCTLSConfigured() { - creds := credentials.NewTLS(tlsConfigurator.IncomingGRPCConfig()) - opts = append(opts, grpc.Creds(creds)) - } - srv := grpc.NewServer(opts...) +// Register the XDS server handlers to the given gRPC server. +func (s *Server) Register(srv *grpc.Server) { envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) - - return srv } // authorize the xDS request using the token stored in ctx. This authorization is diff --git a/docs/rpc/streaming/README.md b/docs/rpc/streaming/README.md index 121b72851c..3cd2ca6feb 100644 --- a/docs/rpc/streaming/README.md +++ b/docs/rpc/streaming/README.md @@ -34,7 +34,7 @@ and sent to any active subscriptions. [rpcclient/health.Health]: https://github.com/hashicorp/consul/blob/main/agent/rpcclient/health/health.go [StreamingHealthServices cache-type]: https://github.com/hashicorp/consul/blob/main/agent/cache-types/streaming_health_services.go [materialized view]: https://github.com/hashicorp/consul/blob/main/agent/submatview/materializer.go -[SubscribeEndpoint]: https://github.com/hashicorp/consul/blob/main/agent/rpc/subscribe/subscribe.go +[SubscribeEndpoint]: https://github.com/hashicorp/consul/blob/main/agent/grpc/private/services/subscribe/subscribe.go [EventPublisher]: https://github.com/hashicorp/consul/blob/main/agent/consul/stream/event_publisher.go [state.Store commits]: https://github.com/hashicorp/consul/blob/main/agent/consul/state/memdb.go @@ -98,4 +98,3 @@ and filtering happens. ![event filtering](./event-filtering.svg) [source](./event-filtering.mmd) -