Restructure gRPC server setup (#12586)

OSS sync of enterprise changes at 0b44395e
This commit is contained in:
Dan Upton 2022-03-22 12:40:24 +00:00 committed by GitHub
parent e5ebc47a94
commit 7298967070
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 177 additions and 152 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/dns"
publicgrpc "github.com/hashicorp/consul/agent/grpc/public"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/rpcclient/health"
@ -206,6 +207,10 @@ type Agent struct {
// depending on the configuration
delegate delegate
// publicGRPCServer is the gRPC server exposed on the dedicated gRPC port (as
// opposed to the multiplexed "server" port).
publicGRPCServer *grpc.Server
// state stores a local representation of the node,
// services and checks. Used for anti-entropy.
State *local.State
@ -335,10 +340,6 @@ type Agent struct {
// the centrally configured proxy/service defaults.
serviceManager *ServiceManager
// grpcServer is the server instance used currently to serve xDS API for
// Envoy.
grpcServer *grpc.Server
// tlsConfigurator is the central instance to provide a *tls.Config
// based on the current consul configuration.
tlsConfigurator *tlsutil.Configurator
@ -359,6 +360,9 @@ type Agent struct {
// run by the Agent
routineManager *routine.Manager
// xdsServer serves the XDS protocol for configuring Envoy proxies.
xdsServer *xds.Server
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
}
@ -493,6 +497,10 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err)
}
// This needs to happen after the initial auto-config is loaded, because TLS
// can only be configured on the gRPC server at the point of creation.
a.buildPublicGRPCServer()
if err := a.startLicenseManager(ctx); err != nil {
return err
}
@ -530,7 +538,7 @@ func (a *Agent) Start(ctx context.Context) error {
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.publicGRPCServer)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
@ -700,12 +708,21 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}
func (a *Agent) buildPublicGRPCServer() {
// TLS is only enabled on the gRPC server if there's an HTTPS port configured.
var tls *tlsutil.Configurator
if a.config.HTTPSPort > 0 {
tls = a.tlsConfigurator
}
a.publicGRPCServer = publicgrpc.NewServer(a.logger.Named("grpc.public"), tls)
}
func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 {
return nil
}
xdsServer := xds.NewServer(
a.xdsServer = xds.NewServer(
a.logger.Named(logging.Envoy),
a.config.ConnectServerlessPluginEnabled,
a.proxyConfig,
@ -715,15 +732,7 @@ func (a *Agent) listenAndServeGRPC() error {
a,
a,
)
tlsConfig := a.tlsConfigurator
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is not enabled
// then gRPC should not use TLS.
if a.config.HTTPSPort <= 0 {
tlsConfig = nil
}
var err error
a.grpcServer = xds.NewGRPCServer(xdsServer, tlsConfig)
a.xdsServer.Register(a.publicGRPCServer)
ln, err := a.startListeners(a.config.GRPCAddrs)
if err != nil {
@ -736,7 +745,7 @@ func (a *Agent) listenAndServeGRPC() error {
"address", innerL.Addr().String(),
"network", innerL.Addr().Network(),
)
err := a.grpcServer.Serve(innerL)
err := a.publicGRPCServer.Serve(innerL)
if err != nil {
a.logger.Error("gRPC server failed", "error", err)
}
@ -1403,9 +1412,7 @@ func (a *Agent) ShutdownAgent() error {
}
// Stop gRPC
if a.grpcServer != nil {
a.grpcServer.Stop()
}
a.publicGRPCServer.Stop()
// Stop the proxy config manager
if a.proxyConfig != nil {

View File

@ -73,7 +73,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
}
var xds *XDSSelf
if s.agent.grpcServer != nil {
if s.agent.xdsServer != nil {
xds = &XDSSelf{
SupportedProxies: map[string][]string{
"envoy": proxysupport.EnvoyVersions,

View File

@ -17,8 +17,8 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"

View File

@ -16,12 +16,13 @@ import (
"testing"
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"
vaultapi "github.com/hashicorp/vault/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/connect"
ca "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul/fsm"
@ -549,7 +550,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1)
deps.Logger = logger
s1, err := NewServer(conf1, deps)
s1, err := NewServer(conf1, deps, nil)
require.NoError(t, err)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -9,11 +9,12 @@ import (
"testing"
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -1527,7 +1528,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config)
deps.Logger = logger
srv, err := NewServer(config, deps)
srv, err := NewServer(config, deps, nil)
require.NoError(t, err)
defer srv.Shutdown()

View File

@ -32,7 +32,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
agent_grpc "github.com/hashicorp/consul/agent/grpc"
agent_grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"

View File

@ -41,11 +41,11 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/wanfed"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
agentgrpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/services/subscribe"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
@ -235,6 +235,10 @@ type Server struct {
// is only ever closed.
leaveCh chan struct{}
// publicGRPCServer is the gRPC server exposed on the dedicated gRPC port, as
// opposed to the multiplexed "server" port which is served by grpcHandler.
publicGRPCServer *grpc.Server
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
router *router.Router
@ -345,7 +349,7 @@ type connHandler interface {
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps) (*Server, error) {
func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
@ -388,6 +392,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
rpcServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))),
insecureRPCServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))),
tlsConfigurator: flat.TLSConfigurator,
publicGRPCServer: publicGRPCServer,
reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,

View File

@ -67,7 +67,6 @@ func testTLSCertificates(serverName string) (cert string, key string, cacert str
return cert, privateKey, ca, nil
}
// testServerACLConfig setup some common ACL configurations.
func testServerACLConfig(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
@ -264,7 +263,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
}
}
srv, err := NewServer(c, newDefaultDeps(t, c))
srv, err := NewServer(c, newDefaultDeps(t, c), nil)
if err != nil {
return nil, err
}

View File

@ -5,7 +5,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/grpc/private/services/subscribe"
"github.com/hashicorp/consul/agent/structs"
)

View File

@ -14,8 +14,8 @@ import (
"golang.org/x/sync/errgroup"
gogrpc "google.golang.org/grpc"
grpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"

View File

@ -0,0 +1,35 @@
package middleware
import (
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// PanicHandlerMiddlewareOpts returns the []recovery.Option containing
// recovery handler function.
func PanicHandlerMiddlewareOpts(logger Logger) []recovery.Option {
return []recovery.Option{
recovery.WithRecoveryHandler(NewPanicHandler(logger)),
}
}
// NewPanicHandler returns a recovery.RecoveryHandlerFunc closure function
// to handle panic in GRPC server's handlers.
func NewPanicHandler(logger Logger) recovery.RecoveryHandlerFunc {
return func(p interface{}) (err error) {
// Log the panic and the stack trace of the Goroutine that caused the panic.
stacktrace := hclog.Stacktrace()
logger.Error("panic serving grpc request",
"panic", p,
"stack", stacktrace,
)
return status.Errorf(codes.Internal, "grpc: panic serving request")
}
}
type Logger interface {
Error(string, ...interface{})
}

View File

@ -1,4 +1,4 @@
package grpc
package private
import (
"context"

View File

@ -1,4 +1,4 @@
package grpc
package private
import (
"context"
@ -14,8 +14,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/grpc/private/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/sdk/freeport"
@ -145,9 +145,9 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
InternalRPC: tlsutil.ProtocolConfig{
VerifyIncoming: true,
CAFile: "../../test/hostname/CertAuth.crt",
CertFile: "../../test/hostname/Alice.crt",
KeyFile: "../../test/hostname/Alice.key",
CAFile: "../../../test/hostname/CertAuth.crt",
CertFile: "../../../test/hostname/Alice.crt",
KeyFile: "../../../test/hostname/Alice.key",
VerifyOutgoing: true,
},
}, hclog.New(nil))
@ -192,9 +192,9 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
InternalRPC: tlsutil.ProtocolConfig{
VerifyIncoming: true,
CAFile: "../../test/hostname/CertAuth.crt",
CertFile: "../../test/hostname/Bob.crt",
KeyFile: "../../test/hostname/Bob.key",
CAFile: "../../../test/hostname/CertAuth.crt",
CertFile: "../../../test/hostname/Bob.crt",
KeyFile: "../../../test/hostname/Bob.key",
VerifyOutgoing: true,
VerifyServerHostname: true,
},
@ -222,9 +222,9 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{
InternalRPC: tlsutil.ProtocolConfig{
VerifyIncoming: true,
CAFile: "../../test/hostname/CertAuth.crt",
CertFile: "../../test/hostname/Betty.crt",
KeyFile: "../../test/hostname/Betty.key",
CAFile: "../../../test/hostname/CertAuth.crt",
CertFile: "../../../test/hostname/Betty.crt",
KeyFile: "../../../test/hostname/Betty.key",
VerifyOutgoing: true,
VerifyServerHostname: true,
},

View File

@ -1,21 +1,16 @@
/*
Package grpc provides a Handler and client for agent gRPC connections.
*/
package grpc
package private
import (
"fmt"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
agentmiddleware "github.com/hashicorp/consul/agent/grpc/middleware"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
@ -26,7 +21,7 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
recoveryOpts := PanicHandlerMiddlewareOpts(logger)
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)
opts := []grpc.ServerOption{
grpc.StatsHandler(newStatsHandler(metrics)),
@ -53,29 +48,6 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)
return &Handler{srv: srv, listener: lis}
}
// PanicHandlerMiddlewareOpts returns the []recovery.Option containing
// recovery handler function.
func PanicHandlerMiddlewareOpts(logger Logger) []recovery.Option {
return []recovery.Option{
recovery.WithRecoveryHandler(NewPanicHandler(logger)),
}
}
// NewPanicHandler returns a recovery.RecoveryHandlerFunc closure function
// to handle panic in GRPC server's handlers.
func NewPanicHandler(logger Logger) recovery.RecoveryHandlerFunc {
return func(p interface{}) (err error) {
// Log the panic and the stack trace of the Goroutine that caused the panic.
stacktrace := hclog.Stacktrace()
logger.Error("panic serving grpc request",
"panic", p,
"stack", stacktrace,
)
return status.Errorf(codes.Internal, "grpc: panic serving request")
}
}
// Handler implements a handler for the rpc server listener, and the
// agent.Component interface for managing the lifecycle of the grpc.Server.
type Handler struct {

View File

@ -1,4 +1,4 @@
package grpc
package private
import (
"bytes"
@ -13,8 +13,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/grpc/private/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
)
func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
@ -57,5 +57,5 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
// Checking the entire stack trace is not possible, let's
// make sure that it contains a couple of expected strings.
require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc.(*simplePanic).Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc/private.(*simplePanic).Something`)
}

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
// source: agent/grpc/internal/testservice/simple.proto
// source: agent/grpc/private/internal/testservice/simple.proto
package testservice

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: agent/grpc/internal/testservice/simple.proto
// source: agent/grpc/private/internal/testservice/simple.proto
package testservice
@ -37,7 +37,7 @@ func (m *Req) Reset() { *m = Req{} }
func (m *Req) String() string { return proto.CompactTextString(m) }
func (*Req) ProtoMessage() {}
func (*Req) Descriptor() ([]byte, []int) {
return fileDescriptor_3009a77c573f826d, []int{0}
return fileDescriptor_98af0751f806f450, []int{0}
}
func (m *Req) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -85,7 +85,7 @@ func (m *Resp) Reset() { *m = Resp{} }
func (m *Resp) String() string { return proto.CompactTextString(m) }
func (*Resp) ProtoMessage() {}
func (*Resp) Descriptor() ([]byte, []int) {
return fileDescriptor_3009a77c573f826d, []int{1}
return fileDescriptor_98af0751f806f450, []int{1}
}
func (m *Resp) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -134,24 +134,25 @@ func init() {
}
func init() {
proto.RegisterFile("agent/grpc/internal/testservice/simple.proto", fileDescriptor_3009a77c573f826d)
proto.RegisterFile("agent/grpc/private/internal/testservice/simple.proto", fileDescriptor_98af0751f806f450)
}
var fileDescriptor_3009a77c573f826d = []byte{
// 206 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x49, 0x4c, 0x4f, 0xcd,
0x2b, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1,
0x2f, 0x49, 0x2d, 0x2e, 0x29, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0xce, 0xcc, 0x2d,
0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0x92, 0x51, 0x52, 0xe5, 0x62,
0x0e, 0x4a, 0x2d, 0x14, 0x92, 0xe3, 0xe2, 0x72, 0x49, 0x2c, 0x49, 0x4c, 0x4e, 0x05, 0xe9, 0x96,
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, 0x51, 0x72, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e,
0x00, 0xa9, 0x0b, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xf2, 0x4b, 0xcc, 0x4d, 0x85, 0xa9, 0x43, 0x88,
0xa0, 0x99, 0xc3, 0x84, 0x6e, 0x8e, 0x51, 0x2e, 0x17, 0x5b, 0x30, 0xd8, 0x2d, 0x42, 0x46, 0x5c,
0x9c, 0xc1, 0xf9, 0xb9, 0xa9, 0x25, 0x19, 0x99, 0x79, 0xe9, 0x42, 0x02, 0x7a, 0x48, 0x6e, 0xd2,
0x0b, 0x4a, 0x2d, 0x94, 0x12, 0x44, 0x13, 0x29, 0x2e, 0x50, 0x62, 0x10, 0xd2, 0xe7, 0x62, 0x71,
0xcb, 0xc9, 0x2f, 0x27, 0x52, 0xb9, 0x01, 0xa3, 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e,
0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0x38, 0x0c, 0x8c,
0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x4b, 0x16, 0x40, 0x33, 0x01, 0x00, 0x00,
var fileDescriptor_98af0751f806f450 = []byte{
// 214 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x49, 0x4c, 0x4f, 0xcd,
0x2b, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2c, 0x4b, 0x2c, 0x49, 0xd5, 0xcf,
0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, 0x2f, 0x49, 0x2d, 0x2e, 0x29, 0x4e, 0x2d, 0x2a,
0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0xce, 0xcc, 0x2d, 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9,
0x17, 0xe2, 0x46, 0x92, 0x51, 0x52, 0xe5, 0x62, 0x0e, 0x4a, 0x2d, 0x14, 0x92, 0xe3, 0xe2, 0x72,
0x49, 0x2c, 0x49, 0x4c, 0x4e, 0x05, 0xe9, 0x96, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12,
0x51, 0x72, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e, 0x00, 0xa9, 0x0b, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d,
0xf2, 0x4b, 0xcc, 0x4d, 0x85, 0xa9, 0x43, 0x88, 0xa0, 0x99, 0xc3, 0x84, 0x6e, 0x8e, 0x51, 0x2e,
0x17, 0x5b, 0x30, 0xd8, 0x2d, 0x42, 0x46, 0x5c, 0x9c, 0xc1, 0xf9, 0xb9, 0xa9, 0x25, 0x19, 0x99,
0x79, 0xe9, 0x42, 0x02, 0x7a, 0x48, 0x6e, 0xd2, 0x0b, 0x4a, 0x2d, 0x94, 0x12, 0x44, 0x13, 0x29,
0x2e, 0x50, 0x62, 0x10, 0xd2, 0xe7, 0x62, 0x71, 0xcb, 0xc9, 0x2f, 0x27, 0x52, 0xb9, 0x01, 0xa3,
0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3,
0xb1, 0x1c, 0x43, 0x12, 0x1b, 0x38, 0x0c, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x76, 0xce,
0x88, 0x7d, 0x3b, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -295,7 +296,7 @@ var _Simple_serviceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
},
Metadata: "agent/grpc/internal/testservice/simple.proto",
Metadata: "agent/grpc/private/internal/testservice/simple.proto",
}
func (m *Req) Marshal() (dAtA []byte, err error) {

View File

@ -1,4 +1,4 @@
package grpc
package private
import (
"context"
@ -10,13 +10,12 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/private/internal/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"

View File

@ -21,7 +21,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"

View File

@ -1,4 +1,4 @@
package grpc
package private
import (
"context"

View File

@ -1,4 +1,4 @@
package grpc
package private
import (
"context"
@ -14,7 +14,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/private/internal/testservice"
"github.com/hashicorp/go-hclog"
)

View File

@ -0,0 +1,34 @@
package public
import (
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
agentmiddleware "github.com/hashicorp/consul/agent/grpc/middleware"
"github.com/hashicorp/consul/tlsutil"
)
// NewServer constructs a gRPC server for the public gRPC port, to which
// handlers can be registered.
func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.Server {
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)
opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
),
}
if tls != nil && tls.GRPCTLSConfigured() {
creds := credentials.NewTLS(tls.IncomingGRPCConfig())
opts = append(opts, grpc.Creds(creds))
}
return grpc.NewServer(opts...)
}

View File

@ -412,7 +412,7 @@ func DialRPCViaMeshGateway(
}
if nextProto != ALPN_RPCGRPC {
// agent/grpc/client.go:dial() handles this in another way for gRPC
// agent/grpc/private/client.go:dial() handles this in another way for gRPC
if tcp, ok := rawConn.(*net.TCPConn); ok {
_ = tcp.SetKeepAlive(true)
_ = tcp.SetNoDelay(true)

View File

@ -20,12 +20,11 @@ import (
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
@ -265,7 +264,6 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
grpc.StatsCounters,
local.StateCounters,
raftCounters,
middleware.NewRPCCounters,
}
// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?

View File

@ -22,7 +22,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/grpc/private/services/subscribe"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"

View File

@ -7,24 +7,19 @@ import (
"time"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/tlsutil"
)
var StatsGauges = []prometheus.GaugeDefinition{
@ -206,30 +201,9 @@ func tokenFromContext(ctx context.Context) string {
return ""
}
// NewGRPCServer creates a grpc.Server, registers the Server, and then returns
// the grpc.Server.
func NewGRPCServer(s *Server, tlsConfigurator *tlsutil.Configurator) *grpc.Server {
recoveryOpts := agentgrpc.PanicHandlerMiddlewareOpts(s.Logger)
opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
),
}
if tlsConfigurator != nil && tlsConfigurator.GRPCTLSConfigured() {
creds := credentials.NewTLS(tlsConfigurator.IncomingGRPCConfig())
opts = append(opts, grpc.Creds(creds))
}
srv := grpc.NewServer(opts...)
// Register the XDS server handlers to the given gRPC server.
func (s *Server) Register(srv *grpc.Server) {
envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s)
return srv
}
// authorize the xDS request using the token stored in ctx. This authorization is

View File

@ -34,7 +34,7 @@ and sent to any active subscriptions.
[rpcclient/health.Health]: https://github.com/hashicorp/consul/blob/main/agent/rpcclient/health/health.go
[StreamingHealthServices cache-type]: https://github.com/hashicorp/consul/blob/main/agent/cache-types/streaming_health_services.go
[materialized view]: https://github.com/hashicorp/consul/blob/main/agent/submatview/materializer.go
[SubscribeEndpoint]: https://github.com/hashicorp/consul/blob/main/agent/rpc/subscribe/subscribe.go
[SubscribeEndpoint]: https://github.com/hashicorp/consul/blob/main/agent/grpc/private/services/subscribe/subscribe.go
[EventPublisher]: https://github.com/hashicorp/consul/blob/main/agent/consul/stream/event_publisher.go
[state.Store commits]: https://github.com/hashicorp/consul/blob/main/agent/consul/state/memdb.go
@ -98,4 +98,3 @@ and filtering happens.
![event filtering](./event-filtering.svg)
<sup>[source](./event-filtering.mmd)</sup>