Add separate grpc_tls port.

To ease the transition for users, the original gRPC
port can still operate in a deprecated mode as either
plain-text or TLS mode. This behavior should be removed
in a future release whenever we no longer support this.

The resulting behavior from this commit is:
  `ports.grpc > 0 && ports.grpc_tls > 0` spawns both plain-text and tls ports.
  `ports.grpc > 0 && grpc.tls == undefined` spawns a single plain-text port.
  `ports.grpc > 0 && grpc.tls != undefined` spawns a single tls port (backwards compat mode).
This commit is contained in:
Derek Menteer 2022-08-19 12:07:22 -05:00
parent 1099665473
commit 1255a8a20d
23 changed files with 296 additions and 117 deletions

View File

@ -213,10 +213,14 @@ type Agent struct {
// depending on the configuration
delegate delegate
// externalGRPCServer is the gRPC server exposed on the dedicated gRPC port (as
// externalGRPCServer is the gRPC server exposed on a dedicated gRPC port (as
// opposed to the multiplexed "server" port).
externalGRPCServer *grpc.Server
// externalGRPCTLSServer is the gRPC server exposed on a dedicated gRPC-TLS port (as
// opposed to the multiplexed "server" port).
externalGRPCTLSServer *grpc.Server
// state stores a local representation of the node,
// services and checks. Used for anti-entropy.
State *local.State
@ -384,18 +388,18 @@ type Agent struct {
// New process the desired options and creates a new Agent.
// This process will
// * parse the config given the config Flags
// * setup logging
// * using predefined logger given in an option
// OR
// * initialize a new logger from the configuration
// including setting up gRPC logging
// * initialize telemetry
// * create a TLS Configurator
// * build a shared connection pool
// * create the ServiceManager
// * setup the NodeID if one isn't provided in the configuration
// * create the AutoConfig object for future use in fully
// - parse the config given the config Flags
// - setup logging
// - using predefined logger given in an option
// OR
// - initialize a new logger from the configuration
// including setting up gRPC logging
// - initialize telemetry
// - create a TLS Configurator
// - build a shared connection pool
// - create the ServiceManager
// - setup the NodeID if one isn't provided in the configuration
// - create the AutoConfig object for future use in fully
// resolving the configuration
func New(bd BaseDeps) (*Agent, error) {
a := Agent{
@ -539,7 +543,8 @@ func (a *Agent) Start(ctx context.Context) error {
// This needs to happen after the initial auto-config is loaded, because TLS
// can only be configured on the gRPC server at the point of creation.
a.buildExternalGRPCServer()
a.externalGRPCServer, a.externalGRPCTLSServer = external.BuildExternalGRPCServers(
a.config.GRPCPort, a.config.GRPCTLSPort, a.tlsConfigurator, a.logger)
if err := a.startLicenseManager(ctx); err != nil {
return err
@ -578,7 +583,14 @@ func (a *Agent) Start(ctx context.Context) error {
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer)
var externalGRPCServers []*grpc.Server
if a.externalGRPCServer != nil {
externalGRPCServers = append(externalGRPCServers, a.externalGRPCServer)
}
if a.externalGRPCTLSServer != nil {
externalGRPCServers = append(externalGRPCServers, a.externalGRPCTLSServer)
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, externalGRPCServers)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
@ -702,7 +714,7 @@ func (a *Agent) Start(ctx context.Context) error {
a.apiServers.Start(srv)
}
// Start gRPC server.
// Start gRPC and gRPC+TLS servers.
if err := a.listenAndServeGRPC(); err != nil {
return err
}
@ -760,15 +772,10 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}
func (a *Agent) buildExternalGRPCServer() {
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"), a.tlsConfigurator)
}
func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
var cfg xds.ProxyConfigSource = localproxycfg.NewConfigSource(a.proxyConfig)
@ -787,7 +794,6 @@ func (a *Agent) listenAndServeGRPC() error {
}()
cfg = catalogCfg
}
a.xdsServer = xds.NewServer(
a.config.NodeName,
a.logger.Named(logging.Envoy),
@ -798,24 +804,44 @@ func (a *Agent) listenAndServeGRPC() error {
},
a,
)
a.xdsServer.Register(a.externalGRPCServer)
ln, err := a.startListeners(a.config.GRPCAddrs)
if err != nil {
return err
// Spawn listeners and register xds servers.
var listeners []net.Listener
start := func(proto string, addrs []net.Addr, srv *grpc.Server) error {
if len(addrs) < 1 || srv == nil {
return nil
}
a.xdsServer.Register(srv)
ln, err := a.startListeners(addrs)
if err != nil {
return err
}
listeners = append(listeners, ln...)
for _, l := range ln {
go func(innerL net.Listener) {
a.logger.Info("Started gRPC server",
"protocol", proto,
"address", innerL.Addr().String(),
"network", innerL.Addr().Network(),
)
err := srv.Serve(innerL)
if err != nil {
a.logger.Error("gRPC server failed", "protocol", proto, "error", err)
}
}(l)
}
return nil
}
for _, l := range ln {
go func(innerL net.Listener) {
a.logger.Info("Started gRPC server",
"address", innerL.Addr().String(),
"network", innerL.Addr().Network(),
)
err := a.externalGRPCServer.Serve(innerL)
if err != nil {
a.logger.Error("gRPC server failed", "error", err)
}
}(l)
if err := start("grpc", a.config.GRPCAddrs, a.externalGRPCServer); err != nil {
closeListeners(listeners)
return err
}
if err := start("grpc_tls", a.config.GRPCTLSAddrs, a.externalGRPCTLSServer); err != nil {
closeListeners(listeners)
return err
}
return nil
}
@ -1505,7 +1531,12 @@ func (a *Agent) ShutdownAgent() error {
}
// Stop gRPC
a.externalGRPCServer.Stop()
if a.externalGRPCServer != nil {
a.externalGRPCServer.Stop()
}
if a.externalGRPCTLSServer != nil {
a.externalGRPCTLSServer.Stop()
}
// Stop the proxy config manager
if a.proxyConfig != nil {
@ -3934,6 +3965,10 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading tls configuration: %s", err)
}
// Setup the external GRPC servers.
a.externalGRPCServer, a.externalGRPCTLSServer = external.BuildExternalGRPCServers(
a.config.GRPCPort, a.config.GRPCTLSPort, a.tlsConfigurator, a.logger)
// Reload service/check definitions and metadata.
if err := a.loadServices(newCfg, snap); err != nil {
return fmt.Errorf("Failed reloading services: %s", err)

View File

@ -45,7 +45,11 @@ type Self struct {
type XDSSelf struct {
SupportedProxies map[string][]string
Port int
// Port could be used for either TLS or plain-text communication
// up through version 1.14. In order to maintain backwards-compatibility,
// Port will now default to TLS and fallback to the standard port value.
Port int
PortTLS int
}
func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -78,7 +82,13 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
SupportedProxies: map[string][]string{
"envoy": proxysupport.EnvoyVersions,
},
Port: s.agent.config.GRPCPort,
// Prefer the TLS port. See comment on the XDSSelf struct for details.
Port: s.agent.config.GRPCTLSPort,
PortTLS: s.agent.config.GRPCTLSPort,
}
// Fallback to standard port if TLS is not enabled.
if xds.PortTLS <= 0 {
xds.Port = s.agent.config.GRPCPort
}
}

View File

@ -1434,15 +1434,8 @@ func TestAgent_Self(t *testing.T) {
cases := map[string]struct {
hcl string
expectXDS bool
grpcTLS bool
}{
"normal": {
hcl: `
node_meta {
somekey = "somevalue"
}
`,
expectXDS: true,
},
"no grpc": {
hcl: `
node_meta {
@ -1453,13 +1446,35 @@ func TestAgent_Self(t *testing.T) {
}
`,
expectXDS: false,
grpcTLS: false,
},
"plaintext grpc": {
hcl: `
node_meta {
somekey = "somevalue"
}
`,
expectXDS: true,
grpcTLS: false,
},
"tls grpc": {
hcl: `
node_meta {
somekey = "somevalue"
}
`,
expectXDS: true,
grpcTLS: true,
},
}
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
a := NewTestAgent(t, tc.hcl)
a := StartTestAgent(t, TestAgent{
HCL: tc.hcl,
UseGRPCTLS: tc.grpcTLS,
})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -1487,6 +1502,12 @@ func TestAgent_Self(t *testing.T) {
map[string][]string{"envoy": proxysupport.EnvoyVersions},
val.XDS.SupportedProxies,
)
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.PortTLS)
if tc.grpcTLS {
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.Port)
} else {
require.Equal(t, a.Config.GRPCPort, val.XDS.Port)
}
} else {
require.Nil(t, val.XDS, "xds component should be missing when gRPC is disabled")

View File

@ -2095,7 +2095,7 @@ func TestAgent_HTTPCheck_EnableAgentTLSForChecks(t *testing.T) {
run := func(t *testing.T, ca string) {
a := StartTestAgent(t, TestAgent{
UseTLS: true,
UseHTTPS: true,
HCL: `
enable_agent_tls_for_checks = true
@ -3860,7 +3860,7 @@ func TestAgent_reloadWatchesHTTPS(t *testing.T) {
}
t.Parallel()
a := TestAgent{UseTLS: true}
a := TestAgent{UseHTTPS: true}
if err := a.Start(t); err != nil {
t.Fatal(err)
}
@ -5207,7 +5207,7 @@ func TestAgent_AutoEncrypt(t *testing.T) {
server = ` + strconv.Itoa(srv.Config.RPCBindAddr.Port) + `
}
retry_join = ["` + srv.Config.SerfBindAddrLAN.String() + `"]`,
UseTLS: true,
UseHTTPS: true,
})
defer client.Shutdown()

View File

@ -125,10 +125,10 @@ type LoadResult struct {
//
// The sources are merged in the following order:
//
// * default configuration
// * config files in alphabetical order
// * command line arguments
// * overrides
// - default configuration
// - config files in alphabetical order
// - command line arguments
// - overrides
//
// The config sources are merged sequentially and later values overwrite
// previously set values. Slice values are merged by concatenating the two slices.
@ -433,6 +433,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
httpsPort := b.portVal("ports.https", c.Ports.HTTPS)
serverPort := b.portVal("ports.server", c.Ports.Server)
grpcPort := b.portVal("ports.grpc", c.Ports.GRPC)
grpcTlsPort := b.portVal("ports.grpc_tls", c.Ports.GRPCTLS)
serfPortLAN := b.portVal("ports.serf_lan", c.Ports.SerfLAN)
serfPortWAN := b.portVal("ports.serf_wan", c.Ports.SerfWAN)
proxyMinPort := b.portVal("ports.proxy_min_port", c.Ports.ProxyMinPort)
@ -563,6 +564,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
httpAddrs := b.makeAddrs(b.expandAddrs("addresses.http", c.Addresses.HTTP), clientAddrs, httpPort)
httpsAddrs := b.makeAddrs(b.expandAddrs("addresses.https", c.Addresses.HTTPS), clientAddrs, httpsPort)
grpcAddrs := b.makeAddrs(b.expandAddrs("addresses.grpc", c.Addresses.GRPC), clientAddrs, grpcPort)
grpcTlsAddrs := b.makeAddrs(b.expandAddrs("addresses.grpc_tls", c.Addresses.GRPCTLS), clientAddrs, grpcTlsPort)
for _, a := range dnsAddrs {
if x, ok := a.(*net.TCPAddr); ok {
@ -987,8 +989,10 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
EnableRemoteScriptChecks: enableRemoteScriptChecks,
EnableLocalScriptChecks: enableLocalScriptChecks,
EncryptKey: stringVal(c.EncryptKey),
GRPCPort: grpcPort,
GRPCAddrs: grpcAddrs,
GRPCPort: grpcPort,
GRPCTLSAddrs: grpcTlsAddrs,
GRPCTLSPort: grpcTlsPort,
HTTPMaxConnsPerClient: intVal(c.Limits.HTTPMaxConnsPerClient),
HTTPSHandshakeTimeout: b.durationVal("limits.https_handshake_timeout", c.Limits.HTTPSHandshakeTimeout),
KVMaxValueSize: uint64Val(c.Limits.KVMaxValueSize),

View File

@ -332,10 +332,11 @@ type Consul struct {
}
type Addresses struct {
DNS *string `mapstructure:"dns"`
HTTP *string `mapstructure:"http"`
HTTPS *string `mapstructure:"https"`
GRPC *string `mapstructure:"grpc"`
DNS *string `mapstructure:"dns"`
HTTP *string `mapstructure:"http"`
HTTPS *string `mapstructure:"https"`
GRPC *string `mapstructure:"grpc"`
GRPCTLS *string `mapstructure:"grpc_tls"`
}
type AdvertiseAddrsConfig struct {
@ -694,6 +695,7 @@ type Ports struct {
SerfWAN *int `mapstructure:"serf_wan"`
Server *int `mapstructure:"server"`
GRPC *int `mapstructure:"grpc"`
GRPCTLS *int `mapstructure:"grpc_tls"`
ProxyMinPort *int `mapstructure:"proxy_min_port"`
ProxyMaxPort *int `mapstructure:"proxy_max_port"`
SidecarMinPort *int `mapstructure:"sidecar_min_port"`

View File

@ -53,7 +53,8 @@ func AddFlags(fs *flag.FlagSet, f *LoadOpts) {
add(&f.FlagValues.EnableLocalScriptChecks, "enable-local-script-checks", "Enables health check scripts from configuration file.")
add(&f.FlagValues.HTTPConfig.AllowWriteHTTPFrom, "allow-write-http-from", "Only allow write endpoint calls from given network. CIDR format, can be specified multiple times.")
add(&f.FlagValues.EncryptKey, "encrypt", "Provides the gossip encryption key.")
add(&f.FlagValues.Ports.GRPC, "grpc-port", "Sets the gRPC API port to listen on (currently needed for Envoy xDS only).")
add(&f.FlagValues.Ports.GRPC, "grpc-port", "Sets the gRPC API port to listen on.")
add(&f.FlagValues.Ports.GRPCTLS, "grpc-tls-port", "Sets the gRPC-TLS API port to listen on.")
add(&f.FlagValues.Ports.HTTP, "http-port", "Sets the HTTP API port to listen on.")
add(&f.FlagValues.Ports.HTTPS, "https-port", "Sets the HTTPS API port to listen on.")
add(&f.FlagValues.StartJoinAddrsLAN, "join", "Address of an agent to join at start time. Can be specified multiple times.")

View File

@ -670,13 +670,18 @@ type RuntimeConfig struct {
// flag: -encrypt string
EncryptKey string
// GRPCPort is the port the gRPC server listens on. Currently this only
// exposes the xDS and ext_authz APIs for Envoy and it is disabled by default.
// GRPCPort is the port the gRPC server listens on. It is disabled by default.
//
// hcl: ports { grpc = int }
// flags: -grpc-port int
GRPCPort int
// GRPCTLSPort is the port the gRPC server listens on. It is disabled by default.
//
// hcl: ports { grpc_tls = int }
// flags: -grpc-tls-port int
GRPCTLSPort int
// GRPCAddrs contains the list of TCP addresses and UNIX sockets the gRPC
// server will bind to. If the gRPC endpoint is disabled (ports.grpc <= 0)
// the list is empty.
@ -692,6 +697,21 @@ type RuntimeConfig struct {
// hcl: client_addr = string addresses { grpc = string } ports { grpc = int }
GRPCAddrs []net.Addr
// GRPCTLSAddrs contains the list of TCP addresses and UNIX sockets the gRPC
// server will bind to. If the gRPC endpoint is disabled (ports.grpc <= 0)
// the list is empty.
//
// The addresses are taken from 'addresses.grpc_tls' which should contain a
// space separated list of ip addresses, UNIX socket paths and/or
// go-sockaddr templates. UNIX socket paths must be written as
// 'unix://<full path>', e.g. 'unix:///var/run/consul-grpc.sock'.
//
// If 'addresses.grpc' was not provided the 'client_addr' addresses are
// used.
//
// hcl: client_addr = string addresses { grpc_tls = string } ports { grpc_tls = int }
GRPCTLSAddrs []net.Addr
// HTTPAddrs contains the list of TCP addresses and UNIX sockets the HTTP
// server will bind to. If the HTTP endpoint is disabled (ports.http <= 0)
// the list is empty.

View File

@ -6016,6 +6016,8 @@ func TestLoad_FullConfig(t *testing.T) {
GRPCPort: 4881,
GRPCAddrs: []net.Addr{tcpAddr("32.31.61.91:4881")},
GRPCTLSPort: 5201,
GRPCTLSAddrs: []net.Addr{tcpAddr("23.14.88.19:5201")},
HTTPAddrs: []net.Addr{tcpAddr("83.39.91.39:7999")},
HTTPBlockEndpoints: []string{"RBvAFcGD", "fWOWFznh"},
AllowWriteHTTPFrom: []*net.IPNet{cidr("127.0.0.0/8"), cidr("22.33.44.55/32"), cidr("0.0.0.0/0")},

View File

@ -192,6 +192,8 @@
"ExposeMinPort": 0,
"GRPCAddrs": [],
"GRPCPort": 0,
"GRPCTLSAddrs": [],
"GRPCTLSPort": 0,
"GossipLANGossipInterval": "0s",
"GossipLANGossipNodes": 0,
"GossipLANProbeInterval": "0s",

View File

@ -44,6 +44,7 @@ addresses = {
http = "83.39.91.39"
https = "95.17.17.19"
grpc = "32.31.61.91"
grpc_tls = "23.14.88.19"
}
advertise_addr = "17.99.29.16"
advertise_addr_wan = "78.63.37.19"
@ -320,6 +321,7 @@ ports {
https = 15127
server = 3757
grpc = 4881
grpc_tls = 5201
proxy_min_port = 2000
proxy_max_port = 3000
sidecar_min_port = 8888

View File

@ -44,7 +44,8 @@
"dns": "93.95.95.81",
"http": "83.39.91.39",
"https": "95.17.17.19",
"grpc": "32.31.61.91"
"grpc": "32.31.61.91",
"grpc_tls": "23.14.88.19"
},
"advertise_addr": "17.99.29.16",
"advertise_addr_wan": "78.63.37.19",
@ -320,6 +321,7 @@
"https": 15127,
"server": 3757,
"grpc": 4881,
"grpc_tls": 5201,
"sidecar_min_port": 8888,
"sidecar_max_port": 9999,
"expose_min_port": 1111,

View File

@ -552,7 +552,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1)
deps.Logger = logger
s1, err := NewServer(conf1, deps, grpc.NewServer())
s1, err := NewServer(conf1, deps, []*grpc.Server{grpc.NewServer()})
require.NoError(t, err)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -580,7 +580,9 @@ func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
require.NoError(t, acceptingServer.Shutdown())
// Have to manually shut down the gRPC server otherwise it stays bound to the port.
acceptingServer.externalGRPCServer.Stop()
for i := range acceptingServer.externalGRPCServers {
acceptingServer.externalGRPCServers[i].Stop()
}
// Restart the server by re-using the previous acceptor's data directory and node id.
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
@ -1490,7 +1492,9 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {
testutil.RunStep(t, "updated server addresses are picked up by the leader", func(t *testing.T) {
// force close the acceptor's gRPC server so the dialier retries with a new address.
acceptor.externalGRPCServer.Stop()
for i := range acceptor.externalGRPCServers {
acceptor.externalGRPCServers[i].Stop()
}
clone := proto.Clone(p.Peering)
updated := clone.(*pbpeering.Peering)

View File

@ -1529,7 +1529,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config)
deps.Logger = logger
srv, err := NewServer(config, deps, grpc.NewServer())
srv, err := NewServer(config, deps, []*grpc.Server{grpc.NewServer()})
require.NoError(t, err)
defer srv.Shutdown()

View File

@ -253,9 +253,9 @@ type Server struct {
// enable RPC forwarding.
externalConnectCAServer *connectca.Server
// externalGRPCServer is the gRPC server exposed on the dedicated gRPC port, as
// externalGRPCServers has all gRPC servers exposed on the dedicated gRPC ports, as
// opposed to the multiplexed "server" port which is served by grpcHandler.
externalGRPCServer *grpc.Server
externalGRPCServers []*grpc.Server
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
@ -377,7 +377,9 @@ type Server struct {
// embedded struct to hold all the enterprise specific data
EnterpriseServer
}
type GRPCService interface {
Register(*grpc.Server)
}
type connHandler interface {
Run() error
Handle(conn net.Conn)
@ -386,7 +388,7 @@ type connHandler interface {
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Server, error) {
func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
@ -432,7 +434,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
reconcileCh: make(chan serf.Member, reconcileChSize),
router: flat.Router,
tlsConfigurator: flat.TLSConfigurator,
externalGRPCServer: externalGRPCServer,
externalGRPCServers: externalGRPCServers,
reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
@ -679,6 +681,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
// Helper function for registering to all GRPC servers
registerGrpc := func(g GRPCService) {
for _, srv := range s.externalGRPCServers {
g.Register(srv)
}
}
// Initialize external gRPC server - register services on external gRPC server.
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
@ -696,7 +705,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
PrimaryDatacenter: s.config.PrimaryDatacenter,
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
})
s.externalACLServer.Register(s.externalGRPCServer)
registerGrpc(s.externalACLServer)
s.externalConnectCAServer = connectca.NewServer(connectca.Config{
Publisher: s.publisher,
@ -709,20 +718,22 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
},
ConnectEnabled: s.config.ConnectEnabled,
})
s.externalConnectCAServer.Register(s.externalGRPCServer)
registerGrpc(s.externalConnectCAServer)
dataplane.NewServer(dataplane.Config{
dataplaneServer := dataplane.NewServer(dataplane.Config{
GetStore: func() dataplane.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.dataplane"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
}).Register(s.externalGRPCServer)
})
registerGrpc(dataplaneServer)
serverdiscovery.NewServer(serverdiscovery.Config{
serverDiscoveryServer := serverdiscovery.NewServer(serverdiscovery.Config{
Publisher: s.publisher,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.server-discovery"),
}).Register(s.externalGRPCServer)
})
registerGrpc(serverDiscoveryServer)
s.peerStreamTracker = peerstream.NewTracker()
s.peeringBackend = NewPeeringBackend(s)
@ -743,7 +754,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
},
})
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
s.peerStreamServer.Register(s.externalGRPCServer)
registerGrpc(s.peerStreamServer)
// Initialize internal gRPC server.
//
@ -1575,12 +1586,12 @@ func (s *Server) Stats() map[string]map[string]string {
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.

View File

@ -249,9 +249,9 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
ln, err := net.Listen("tcp", externalGRPCAddr)
require.NoError(t, err)
go func() {
_ = srv.externalGRPCServer.Serve(ln)
_ = srv.externalGRPCServers[0].Serve(ln)
}()
t.Cleanup(srv.externalGRPCServer.Stop)
t.Cleanup(srv.externalGRPCServers[0].Stop)
}
return dir, srv
@ -301,7 +301,18 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
}
}
srv, err := NewServer(c, deps, external.NewServer(deps.Logger.Named("grpc.external"), deps.TLSConfigurator))
// setup grpc servers
srvGRPC, srvGRPCTLS := external.BuildExternalGRPCServers(
c.GRPCPort, c.GRPCTLSPort, deps.TLSConfigurator, deps.Logger)
var grpcServers []*grpc.Server
if srvGRPC != nil {
grpcServers = append(grpcServers, srvGRPC)
}
if srvGRPCTLS != nil {
grpcServers = append(grpcServers, srvGRPCTLS)
}
srv, err := NewServer(c, deps, grpcServers)
if err != nil {
return nil, err
}
@ -1208,7 +1219,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
}
}
s1, err := NewServer(conf, deps, grpc.NewServer())
s1, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1246,7 +1257,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
return nil
}
s2, err := NewServer(conf, deps, grpc.NewServer())
s2, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1280,7 +1291,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = nil
s1, err := NewServer(conf, deps, grpc.NewServer())
s1, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
require.Error(t, err, "need err when provider func is nil")
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
@ -1299,7 +1310,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
return nil
}
s2, err := NewServer(conf, deps, grpc.NewServer())
s2, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")

View File

@ -11,6 +11,7 @@ import (
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)
// NewServer constructs a gRPC server for the external gRPC port, to which
@ -41,3 +42,32 @@ func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.S
}
return grpc.NewServer(opts...)
}
// BuildExternalGRPCServers constructs two gRPC servers for the external gRPC ports.
// This function exists because behavior for the original `ports.grpc` is dependent on
// whether the new `ports.grpc_tls` is defined. This behavior should be simplified in
// a future release so that the `ports.grpc` is always plain-text and not dependent on
// the `ports.grpc_tls` configuration.
func BuildExternalGRPCServers(grpcPort int, grpcTLSPort int, t *tlsutil.Configurator, l hclog.InterceptLogger) (grpc, grpcTLS *grpc.Server) {
if grpcPort > 0 {
// TODO: remove this deprecated behavior in a future version and only support plain-text for this port.
if grpcTLSPort > 0 {
// Use plain-text if the new grpc_tls port is configured.
grpc = NewServer(l.Named("grpc.external"), nil)
} else {
// Otherwise, check TLS configuration to determine whether to encrypt (for backwards compatibility).
grpc = NewServer(l.Named("grpc.external"), t)
if t != nil && t.GRPCServerUseTLS() {
l.Warn("deprecated gRPC TLS configuration detected. Consider using `ports.grpc_tls` instead")
}
}
}
if grpcTLSPort > 0 {
if t.GRPCServerUseTLS() {
grpcTLS = NewServer(l.Named("grpc_tls.external"), t)
} else {
l.Error("error starting gRPC TLS server", "error", "port is set, but invalid TLS configuration detected")
}
}
return
}

View File

@ -148,7 +148,7 @@ func TestSetupHTTPServer_HTTP2(t *testing.T) {
// Fire up an agent with TLS enabled.
a := StartTestAgent(t, TestAgent{
UseTLS: true,
UseHTTPS: true,
HCL: `
key_file = "../test/client_certs/server.key"
cert_file = "../test/client_certs/server.crt"
@ -1549,7 +1549,7 @@ func TestHTTPServer_HandshakeTimeout(t *testing.T) {
// Fire up an agent with TLS enabled.
a := StartTestAgent(t, TestAgent{
UseTLS: true,
UseHTTPS: true,
HCL: `
key_file = "../test/client_certs/server.key"
cert_file = "../test/client_certs/server.crt"
@ -1621,7 +1621,7 @@ func TestRPC_HTTPSMaxConnsPerClient(t *testing.T) {
// Fire up an agent with TLS enabled.
a := StartTestAgent(t, TestAgent{
UseTLS: tc.tlsEnabled,
UseHTTPS: tc.tlsEnabled,
HCL: hclPrefix + `
limits {
http_max_conns_per_client = 2

View File

@ -1325,7 +1325,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
externalGRPCServer := gogrpc.NewServer()
deps := newDefaultDeps(t, conf)
server, err := consul.NewServer(conf, deps, externalGRPCServer)
server, err := consul.NewServer(conf, deps, []*gogrpc.Server{externalGRPCServer})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, server.Shutdown())

View File

@ -66,9 +66,13 @@ type TestAgent struct {
// and the directory will be removed once the test ends.
DataDir string
// UseTLS, if true, will disable the HTTP port and enable the HTTPS
// UseHTTPS, if true, will disable the HTTP port and enable the HTTPS
// one.
UseTLS bool
UseHTTPS bool
// UseGRPCTLS, if true, will disable the GRPC port and enable the GRPC+TLS
// one.
UseGRPCTLS bool
// dns is a reference to the first started DNS endpoint.
// It is valid after Start().
@ -183,7 +187,7 @@ func (a *TestAgent) Start(t *testing.T) error {
Name: name,
})
portsConfig := randomPortsSource(t, a.UseTLS)
portsConfig := randomPortsSource(t, a.UseHTTPS, a.UseGRPCTLS)
// Create NodeID outside the closure, so that it does not change
testHCLConfig := TestConfigHCL(NodeID())
@ -401,11 +405,11 @@ func (a *TestAgent) consulConfig() *consul.Config {
// chance of port conflicts for concurrently executed test binaries.
// Instead of relying on one set of ports to be sufficient we retry
// starting the agent with different ports on port conflict.
func randomPortsSource(t *testing.T, tls bool) string {
ports := freeport.GetN(t, 7)
func randomPortsSource(t *testing.T, useHTTPS bool, useGRPCTLS bool) string {
ports := freeport.GetN(t, 8)
var http, https int
if tls {
if useHTTPS {
http = -1
https = ports[2]
} else {
@ -413,6 +417,15 @@ func randomPortsSource(t *testing.T, tls bool) string {
https = -1
}
var grpc, grpcTLS int
if useGRPCTLS {
grpc = -1
grpcTLS = ports[7]
} else {
grpc = ports[6]
grpcTLS = -1
}
return `
ports = {
dns = ` + strconv.Itoa(ports[0]) + `
@ -421,7 +434,8 @@ func randomPortsSource(t *testing.T, tls bool) string {
serf_lan = ` + strconv.Itoa(ports[3]) + `
serf_wan = ` + strconv.Itoa(ports[4]) + `
server = ` + strconv.Itoa(ports[5]) + `
grpc = ` + strconv.Itoa(ports[6]) + `
grpc = ` + strconv.Itoa(grpc) + `
grpc_tls = ` + strconv.Itoa(grpcTLS) + `
}
`
}

View File

@ -213,8 +213,8 @@ func (c *cmd) run(args []string) int {
}
ui.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
ui.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.ServerMode, config.Bootstrap))
ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, gRPC: %d, DNS: %d)", config.ClientAddrs,
config.HTTPPort, config.HTTPSPort, config.GRPCPort, config.DNSPort))
ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, gRPC: %d, gRPC-TLS: %d, DNS: %d)", config.ClientAddrs,
config.HTTPPort, config.HTTPSPort, config.GRPCPort, config.GRPCTLSPort, config.DNSPort))
ui.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddrLAN,
config.SerfPortLAN, config.SerfPortWAN))
ui.Info(fmt.Sprintf("Gossip Encryption: %t", config.EncryptKey != ""))

View File

@ -639,7 +639,7 @@ func (c *cmd) xdsAddress(httpCfg *api.Config) (GRPC, error) {
addr := c.grpcAddr
if addr == "" {
port, err := c.lookupXDSPort()
port, protocol, err := c.lookupXDSPort()
if err != nil {
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
}
@ -648,7 +648,7 @@ func (c *cmd) xdsAddress(httpCfg *api.Config) (GRPC, error) {
// enabled.
port = 8502
}
addr = fmt.Sprintf("localhost:%v", port)
addr = fmt.Sprintf("%vlocalhost:%v", protocol, port)
}
// TODO: parse addr as a url instead of strings.HasPrefix/TrimPrefix
@ -697,39 +697,47 @@ func (c *cmd) xdsAddress(httpCfg *api.Config) (GRPC, error) {
return g, nil
}
func (c *cmd) lookupXDSPort() (int, error) {
func (c *cmd) lookupXDSPort() (int, string, error) {
self, err := c.client.Agent().Self()
if err != nil {
return 0, err
return 0, "", err
}
type response struct {
XDS struct {
Port int
Port int
PortTLS int
}
}
var resp response
if err := mapstructure.Decode(self, &resp); err == nil && resp.XDS.Port != 0 {
return resp.XDS.Port, nil
// Determine the protocol based on the provided Port matching PortTLS
proto := "http://"
// TODO: Simplify this check after 1.14 when Port is guaranteed to be plain-text.
if resp.XDS.Port == resp.XDS.PortTLS {
proto = "https://"
}
return resp.XDS.Port, proto, nil
}
// Fallback to old API for the case where a new consul CLI is being used with
// an older API version.
cfg, ok := self["DebugConfig"]
if !ok {
return 0, fmt.Errorf("unexpected agent response: no debug config")
return 0, "", fmt.Errorf("unexpected agent response: no debug config")
}
// TODO what does this mean? What did the old API look like? How does this affect compatibility?
port, ok := cfg["GRPCPort"]
if !ok {
return 0, fmt.Errorf("agent does not have grpc port enabled")
return 0, "", fmt.Errorf("agent does not have grpc port enabled")
}
portN, ok := port.(float64)
if !ok {
return 0, fmt.Errorf("invalid grpc port in agent response")
return 0, "", fmt.Errorf("invalid grpc port in agent response")
}
return int(portN), nil
return int(portN), "", nil
}
func (c *cmd) Synopsis() string {