mirror of https://github.com/status-im/consul.git
Address PR comments.
This commit is contained in:
parent
cf7f24a6ec
commit
f64771c707
|
@ -217,10 +217,6 @@ type Agent struct {
|
|||
// opposed to the multiplexed "server" port).
|
||||
externalGRPCServer *grpc.Server
|
||||
|
||||
// externalGRPCTLSServer is the gRPC server exposed on a dedicated gRPC-TLS port (as
|
||||
// opposed to the multiplexed "server" port).
|
||||
externalGRPCTLSServer *grpc.Server
|
||||
|
||||
// state stores a local representation of the node,
|
||||
// services and checks. Used for anti-entropy.
|
||||
State *local.State
|
||||
|
@ -543,8 +539,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
|
||||
// This needs to happen after the initial auto-config is loaded, because TLS
|
||||
// can only be configured on the gRPC server at the point of creation.
|
||||
a.externalGRPCServer, a.externalGRPCTLSServer = external.BuildExternalGRPCServers(
|
||||
a.config.GRPCPort, a.config.GRPCTLSPort, a.tlsConfigurator, a.logger)
|
||||
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"))
|
||||
|
||||
if err := a.startLicenseManager(ctx); err != nil {
|
||||
return err
|
||||
|
@ -583,14 +578,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
|
||||
// Setup either the client or the server.
|
||||
if c.ServerMode {
|
||||
var externalGRPCServers []*grpc.Server
|
||||
if a.externalGRPCServer != nil {
|
||||
externalGRPCServers = append(externalGRPCServers, a.externalGRPCServer)
|
||||
}
|
||||
if a.externalGRPCTLSServer != nil {
|
||||
externalGRPCServers = append(externalGRPCServers, a.externalGRPCTLSServer)
|
||||
}
|
||||
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, externalGRPCServers)
|
||||
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||
}
|
||||
|
@ -804,29 +792,35 @@ func (a *Agent) listenAndServeGRPC() error {
|
|||
},
|
||||
a,
|
||||
)
|
||||
a.xdsServer.Register(a.externalGRPCServer)
|
||||
|
||||
// Spawn listeners and register xds servers.
|
||||
// Attempt to spawn listeners
|
||||
var listeners []net.Listener
|
||||
start := func(port_name string, addrs []net.Addr, srv *grpc.Server) error {
|
||||
if len(addrs) < 1 || srv == nil {
|
||||
start := func(port_name string, addrs []net.Addr, tlsConf *tls.Config) error {
|
||||
if len(addrs) < 1 {
|
||||
return nil
|
||||
}
|
||||
a.xdsServer.Register(srv)
|
||||
|
||||
ln, err := a.startListeners(addrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
listeners = append(listeners, ln...)
|
||||
for i := range ln {
|
||||
// Wrap with TLS, if provided.
|
||||
if tlsConf != nil {
|
||||
ln[i] = tls.NewListener(ln[i], tlsConf)
|
||||
}
|
||||
listeners = append(listeners, ln[i])
|
||||
}
|
||||
|
||||
for _, l := range ln {
|
||||
go func(innerL net.Listener) {
|
||||
a.logger.Info("Started gRPC server",
|
||||
a.logger.Info("Started gRPC listeners",
|
||||
"port_name", port_name,
|
||||
"address", innerL.Addr().String(),
|
||||
"network", innerL.Addr().Network(),
|
||||
)
|
||||
err := srv.Serve(innerL)
|
||||
err := a.externalGRPCServer.Serve(innerL)
|
||||
if err != nil {
|
||||
a.logger.Error("gRPC server failed", "port_name", port_name, "error", err)
|
||||
}
|
||||
|
@ -835,13 +829,26 @@ func (a *Agent) listenAndServeGRPC() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := start("grpc", a.config.GRPCAddrs, a.externalGRPCServer); err != nil {
|
||||
closeListeners(listeners)
|
||||
return err
|
||||
// The original grpc port may spawn in either plain-text or TLS mode (for backwards compatibility).
|
||||
// TODO: Simplify this block to only spawn plain-text after 1.14 when deprecated TLS support is removed.
|
||||
if a.config.GRPCPort > 0 {
|
||||
// Only allow the grpc port to spawn TLS connections if the other grpc_tls port is NOT defined.
|
||||
var tlsConf *tls.Config = nil
|
||||
if a.config.GRPCTLSPort <= 0 && a.tlsConfigurator.GRPCServerUseTLS() {
|
||||
a.logger.Warn("deprecated gRPC TLS configuration detected. Consider using `ports.grpc_tls` instead")
|
||||
tlsConf = a.tlsConfigurator.IncomingGRPCConfig()
|
||||
}
|
||||
if err := start("grpc", a.config.GRPCAddrs, tlsConf); err != nil {
|
||||
closeListeners(listeners)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := start("grpc_tls", a.config.GRPCTLSAddrs, a.externalGRPCTLSServer); err != nil {
|
||||
closeListeners(listeners)
|
||||
return err
|
||||
// Only allow grpc_tls to spawn with a TLS listener.
|
||||
if a.config.GRPCTLSPort > 0 {
|
||||
if err := start("grpc_tls", a.config.GRPCTLSAddrs, a.tlsConfigurator.IncomingGRPCConfig()); err != nil {
|
||||
closeListeners(listeners)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1535,9 +1542,6 @@ func (a *Agent) ShutdownAgent() error {
|
|||
if a.externalGRPCServer != nil {
|
||||
a.externalGRPCServer.Stop()
|
||||
}
|
||||
if a.externalGRPCTLSServer != nil {
|
||||
a.externalGRPCTLSServer.Stop()
|
||||
}
|
||||
|
||||
// Stop the proxy config manager
|
||||
if a.proxyConfig != nil {
|
||||
|
@ -3966,9 +3970,8 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
|||
return fmt.Errorf("Failed reloading tls configuration: %s", err)
|
||||
}
|
||||
|
||||
// Setup the external GRPC servers.
|
||||
a.externalGRPCServer, a.externalGRPCTLSServer = external.BuildExternalGRPCServers(
|
||||
a.config.GRPCPort, a.config.GRPCTLSPort, a.tlsConfigurator, a.logger)
|
||||
// Setup the external GRPC server.
|
||||
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"))
|
||||
|
||||
// Reload service/check definitions and metadata.
|
||||
if err := a.loadServices(newCfg, snap); err != nil {
|
||||
|
|
|
@ -48,8 +48,16 @@ type XDSSelf struct {
|
|||
// Port could be used for either TLS or plain-text communication
|
||||
// up through version 1.14. In order to maintain backwards-compatibility,
|
||||
// Port will now default to TLS and fallback to the standard port value.
|
||||
Port int
|
||||
PortTLS int
|
||||
// DEPRECATED: Use Ports field instead
|
||||
Port int
|
||||
Ports GRPCPorts
|
||||
}
|
||||
|
||||
// GRPCPorts is used to hold the external GRPC server's port numbers.
|
||||
type GRPCPorts struct {
|
||||
// Technically, this port is not always plain-text as of 1.14, but will be in a future release.
|
||||
Plaintext int
|
||||
TLS int
|
||||
}
|
||||
|
||||
func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -83,11 +91,14 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
|
|||
"envoy": proxysupport.EnvoyVersions,
|
||||
},
|
||||
// Prefer the TLS port. See comment on the XDSSelf struct for details.
|
||||
Port: s.agent.config.GRPCTLSPort,
|
||||
PortTLS: s.agent.config.GRPCTLSPort,
|
||||
Port: s.agent.config.GRPCTLSPort,
|
||||
Ports: GRPCPorts{
|
||||
Plaintext: s.agent.config.GRPCPort,
|
||||
TLS: s.agent.config.GRPCTLSPort,
|
||||
},
|
||||
}
|
||||
// Fallback to standard port if TLS is not enabled.
|
||||
if xds.PortTLS <= 0 {
|
||||
if s.agent.config.GRPCTLSPort <= 0 {
|
||||
xds.Port = s.agent.config.GRPCPort
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1502,7 +1502,8 @@ func TestAgent_Self(t *testing.T) {
|
|||
map[string][]string{"envoy": proxysupport.EnvoyVersions},
|
||||
val.XDS.SupportedProxies,
|
||||
)
|
||||
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.PortTLS)
|
||||
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.Ports.TLS)
|
||||
require.Equal(t, a.Config.GRPCPort, val.XDS.Ports.Plaintext)
|
||||
if tc.grpcTLS {
|
||||
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.Port)
|
||||
} else {
|
||||
|
|
|
@ -552,7 +552,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
|
|||
deps := newDefaultDeps(t, conf1)
|
||||
deps.Logger = logger
|
||||
|
||||
s1, err := NewServer(conf1, deps, []*grpc.Server{grpc.NewServer()})
|
||||
s1, err := NewServer(conf1, deps, grpc.NewServer())
|
||||
require.NoError(t, err)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
|
|
@ -580,9 +580,7 @@ func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
|
|||
require.NoError(t, acceptingServer.Shutdown())
|
||||
|
||||
// Have to manually shut down the gRPC server otherwise it stays bound to the port.
|
||||
for i := range acceptingServer.externalGRPCServers {
|
||||
acceptingServer.externalGRPCServers[i].Stop()
|
||||
}
|
||||
acceptingServer.externalGRPCServer.Stop()
|
||||
|
||||
// Restart the server by re-using the previous acceptor's data directory and node id.
|
||||
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
|
||||
|
@ -1492,9 +1490,7 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {
|
|||
|
||||
testutil.RunStep(t, "updated server addresses are picked up by the leader", func(t *testing.T) {
|
||||
// force close the acceptor's gRPC server so the dialier retries with a new address.
|
||||
for i := range acceptor.externalGRPCServers {
|
||||
acceptor.externalGRPCServers[i].Stop()
|
||||
}
|
||||
acceptor.externalGRPCServer.Stop()
|
||||
|
||||
clone := proto.Clone(p.Peering)
|
||||
updated := clone.(*pbpeering.Peering)
|
||||
|
|
|
@ -1543,7 +1543,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
|
|||
deps := newDefaultDeps(t, config)
|
||||
deps.Logger = logger
|
||||
|
||||
srv, err := NewServer(config, deps, []*grpc.Server{grpc.NewServer()})
|
||||
srv, err := NewServer(config, deps, grpc.NewServer())
|
||||
require.NoError(t, err)
|
||||
defer srv.Shutdown()
|
||||
|
||||
|
|
|
@ -253,9 +253,9 @@ type Server struct {
|
|||
// enable RPC forwarding.
|
||||
externalConnectCAServer *connectca.Server
|
||||
|
||||
// externalGRPCServers has all gRPC servers exposed on the dedicated gRPC ports, as
|
||||
// externalGRPCServers has a gRPC server exposed on the dedicated gRPC ports, as
|
||||
// opposed to the multiplexed "server" port which is served by grpcHandler.
|
||||
externalGRPCServers []*grpc.Server
|
||||
externalGRPCServer *grpc.Server
|
||||
|
||||
// router is used to map out Consul servers in the WAN and in Consul
|
||||
// Enterprise user-defined areas.
|
||||
|
@ -388,7 +388,7 @@ type connHandler interface {
|
|||
|
||||
// NewServer is used to construct a new Consul server from the configuration
|
||||
// and extra options, potentially returning an error.
|
||||
func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*Server, error) {
|
||||
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Server, error) {
|
||||
logger := flat.Logger
|
||||
if err := config.CheckProtocolVersion(); err != nil {
|
||||
return nil, err
|
||||
|
@ -434,7 +434,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
|
|||
reconcileCh: make(chan serf.Member, reconcileChSize),
|
||||
router: flat.Router,
|
||||
tlsConfigurator: flat.TLSConfigurator,
|
||||
externalGRPCServers: externalGRPCServers,
|
||||
externalGRPCServer: externalGRPCServer,
|
||||
reassertLeaderCh: make(chan chan error),
|
||||
sessionTimers: NewSessionTimers(),
|
||||
tombstoneGC: gc,
|
||||
|
@ -681,13 +681,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
|
|||
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
|
||||
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
// Helper function for registering to all GRPC servers
|
||||
registerGrpc := func(g GRPCService) {
|
||||
for _, srv := range s.externalGRPCServers {
|
||||
g.Register(srv)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize external gRPC server - register services on external gRPC server.
|
||||
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
|
||||
ACLsEnabled: s.config.ACLsEnabled,
|
||||
|
@ -705,7 +698,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
|
|||
PrimaryDatacenter: s.config.PrimaryDatacenter,
|
||||
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
|
||||
})
|
||||
registerGrpc(s.externalACLServer)
|
||||
s.externalACLServer.Register(externalGRPCServer)
|
||||
|
||||
s.externalConnectCAServer = connectca.NewServer(connectca.Config{
|
||||
Publisher: s.publisher,
|
||||
|
@ -718,7 +711,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
|
|||
},
|
||||
ConnectEnabled: s.config.ConnectEnabled,
|
||||
})
|
||||
registerGrpc(s.externalConnectCAServer)
|
||||
s.externalConnectCAServer.Register(externalGRPCServer)
|
||||
|
||||
dataplaneServer := dataplane.NewServer(dataplane.Config{
|
||||
GetStore: func() dataplane.StateStore { return s.FSM().State() },
|
||||
|
@ -726,14 +719,14 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
|
|||
ACLResolver: s.ACLResolver,
|
||||
Datacenter: s.config.Datacenter,
|
||||
})
|
||||
registerGrpc(dataplaneServer)
|
||||
dataplaneServer.Register(externalGRPCServer)
|
||||
|
||||
serverDiscoveryServer := serverdiscovery.NewServer(serverdiscovery.Config{
|
||||
Publisher: s.publisher,
|
||||
ACLResolver: s.ACLResolver,
|
||||
Logger: logger.Named("grpc-api.server-discovery"),
|
||||
})
|
||||
registerGrpc(serverDiscoveryServer)
|
||||
serverDiscoveryServer.Register(externalGRPCServer)
|
||||
|
||||
s.peerStreamTracker = peerstream.NewTracker()
|
||||
s.peeringBackend = NewPeeringBackend(s)
|
||||
|
@ -754,7 +747,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
|
|||
},
|
||||
})
|
||||
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
|
||||
registerGrpc(s.peerStreamServer)
|
||||
s.peerStreamServer.Register(externalGRPCServer)
|
||||
|
||||
// Initialize internal gRPC server.
|
||||
//
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"net"
|
||||
|
@ -218,9 +219,10 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
|
|||
var dir string
|
||||
var srv *Server
|
||||
|
||||
var config *Config
|
||||
var deps Deps
|
||||
// Retry added to avoid cases where bind addr is already in use
|
||||
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
|
||||
var config *Config
|
||||
dir, config = testServerConfig(t)
|
||||
for _, fn := range configOpts {
|
||||
fn(config)
|
||||
|
@ -234,7 +236,8 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
|
|||
config.ACLResolverSettings.EnterpriseMeta = *config.AgentEnterpriseMeta()
|
||||
|
||||
var err error
|
||||
srv, err = newServer(t, config)
|
||||
deps = newDefaultDeps(t, config)
|
||||
srv, err = newServerWithDeps(t, config, deps)
|
||||
if err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -245,13 +248,18 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
|
|||
// Normally the gRPC server listener is created at the agent level and
|
||||
// passed down into the Server creation.
|
||||
externalGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)
|
||||
|
||||
ln, err := net.Listen("tcp", externalGRPCAddr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wrap the listener with TLS
|
||||
if deps.TLSConfigurator.GRPCServerUseTLS() {
|
||||
ln = tls.NewListener(ln, deps.TLSConfigurator.IncomingGRPCConfig())
|
||||
}
|
||||
|
||||
go func() {
|
||||
_ = srv.externalGRPCServers[0].Serve(ln)
|
||||
_ = srv.externalGRPCServer.Serve(ln)
|
||||
}()
|
||||
t.Cleanup(srv.externalGRPCServers[0].Stop)
|
||||
t.Cleanup(srv.externalGRPCServer.Stop)
|
||||
}
|
||||
|
||||
return dir, srv
|
||||
|
@ -300,19 +308,8 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
|
|||
oldNotify()
|
||||
}
|
||||
}
|
||||
|
||||
// setup grpc servers
|
||||
srvGRPC, srvGRPCTLS := external.BuildExternalGRPCServers(
|
||||
c.GRPCPort, c.GRPCTLSPort, deps.TLSConfigurator, deps.Logger)
|
||||
var grpcServers []*grpc.Server
|
||||
if srvGRPC != nil {
|
||||
grpcServers = append(grpcServers, srvGRPC)
|
||||
}
|
||||
if srvGRPCTLS != nil {
|
||||
grpcServers = append(grpcServers, srvGRPCTLS)
|
||||
}
|
||||
|
||||
srv, err := NewServer(c, deps, grpcServers)
|
||||
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"))
|
||||
srv, err := NewServer(c, deps, grpcServer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1219,7 +1216,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
s1, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
|
||||
s1, err := NewServer(conf, deps, grpc.NewServer())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1257,7 +1254,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
s2, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
|
||||
s2, err := NewServer(conf, deps, grpc.NewServer())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1291,7 +1288,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
|
|||
deps := newDefaultDeps(t, conf)
|
||||
deps.NewRequestRecorderFunc = nil
|
||||
|
||||
s1, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
|
||||
s1, err := NewServer(conf, deps, grpc.NewServer())
|
||||
|
||||
require.Error(t, err, "need err when provider func is nil")
|
||||
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
|
||||
|
@ -1310,7 +1307,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
s2, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()})
|
||||
s2, err := NewServer(conf, deps, grpc.NewServer())
|
||||
|
||||
require.Error(t, err, "need err when RequestRecorder is nil")
|
||||
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")
|
||||
|
|
|
@ -6,17 +6,14 @@ import (
|
|||
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
||||
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// NewServer constructs a gRPC server for the external gRPC port, to which
|
||||
// handlers can be registered.
|
||||
func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.Server {
|
||||
func NewServer(logger agentmiddleware.Logger) *grpc.Server {
|
||||
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)
|
||||
|
||||
opts := []grpc.ServerOption{
|
||||
|
@ -36,38 +33,5 @@ func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.S
|
|||
MinTime: 15 * time.Second,
|
||||
}),
|
||||
}
|
||||
if tls != nil && tls.GRPCServerUseTLS() {
|
||||
creds := credentials.NewTLS(tls.IncomingGRPCConfig())
|
||||
opts = append(opts, grpc.Creds(creds))
|
||||
}
|
||||
return grpc.NewServer(opts...)
|
||||
}
|
||||
|
||||
// BuildExternalGRPCServers constructs two gRPC servers for the external gRPC ports.
|
||||
// This function exists because behavior for the original `ports.grpc` is dependent on
|
||||
// whether the new `ports.grpc_tls` is defined. This behavior should be simplified in
|
||||
// a future release so that the `ports.grpc` is always plain-text and not dependent on
|
||||
// the `ports.grpc_tls` configuration.
|
||||
func BuildExternalGRPCServers(grpcPort int, grpcTLSPort int, t *tlsutil.Configurator, l hclog.InterceptLogger) (grpc, grpcTLS *grpc.Server) {
|
||||
if grpcPort > 0 {
|
||||
// TODO: remove this deprecated behavior in a future version and only support plain-text for this port.
|
||||
if grpcTLSPort > 0 {
|
||||
// Use plain-text if the new grpc_tls port is configured.
|
||||
grpc = NewServer(l.Named("grpc.external"), nil)
|
||||
} else {
|
||||
// Otherwise, check TLS configuration to determine whether to encrypt (for backwards compatibility).
|
||||
grpc = NewServer(l.Named("grpc.external"), t)
|
||||
if t != nil && t.GRPCServerUseTLS() {
|
||||
l.Warn("deprecated gRPC TLS configuration detected. Consider using `ports.grpc_tls` instead")
|
||||
}
|
||||
}
|
||||
}
|
||||
if grpcTLSPort > 0 {
|
||||
if t.GRPCServerUseTLS() {
|
||||
grpcTLS = NewServer(l.Named("grpc_tls.external"), t)
|
||||
} else {
|
||||
l.Error("error starting gRPC TLS server", "error", "port is set, but invalid TLS configuration detected")
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1325,7 +1325,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
|
|||
externalGRPCServer := gogrpc.NewServer()
|
||||
|
||||
deps := newDefaultDeps(t, conf)
|
||||
server, err := consul.NewServer(conf, deps, []*gogrpc.Server{externalGRPCServer})
|
||||
server, err := consul.NewServer(conf, deps, externalGRPCServer)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, server.Shutdown())
|
||||
|
|
|
@ -705,20 +705,21 @@ func (c *cmd) lookupXDSPort() (int, string, error) {
|
|||
|
||||
type response struct {
|
||||
XDS struct {
|
||||
Port int
|
||||
PortTLS int
|
||||
Ports struct {
|
||||
Plaintext int
|
||||
TLS int
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var resp response
|
||||
if err := mapstructure.Decode(self, &resp); err == nil && resp.XDS.Port != 0 {
|
||||
// Determine the protocol based on the provided Port matching PortTLS
|
||||
proto := "http://"
|
||||
// TODO: Simplify this check after 1.14 when Port is guaranteed to be plain-text.
|
||||
if resp.XDS.Port == resp.XDS.PortTLS {
|
||||
proto = "https://"
|
||||
if err := mapstructure.Decode(self, &resp); err == nil {
|
||||
if resp.XDS.Ports.TLS > 0 {
|
||||
return resp.XDS.Ports.TLS, "https://", nil
|
||||
}
|
||||
if resp.XDS.Ports.Plaintext > 0 {
|
||||
return resp.XDS.Ports.Plaintext, "http://", nil
|
||||
}
|
||||
return resp.XDS.Port, proto, nil
|
||||
}
|
||||
|
||||
// Fallback to old API for the case where a new consul CLI is being used with
|
||||
|
|
|
@ -117,8 +117,8 @@ type generateConfigTestCase struct {
|
|||
Files map[string]string
|
||||
ProxyConfig map[string]interface{}
|
||||
NamespacesEnabled bool
|
||||
XDSPort int // only used for testing custom-configured grpc port
|
||||
AgentSelf110 bool // fake the agent API from versions v1.10 and earlier
|
||||
XDSPorts agent.GRPCPorts // only used for testing custom-configured grpc port
|
||||
AgentSelf110 bool // fake the agent API from versions v1.10 and earlier
|
||||
WantArgs BootstrapTplArgs
|
||||
WantErr string
|
||||
}
|
||||
|
@ -447,9 +447,9 @@ func TestGenerateConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
Name: "xds-addr-config",
|
||||
Flags: []string{"-proxy-id", "test-proxy"},
|
||||
XDSPort: 9999,
|
||||
Name: "xds-addr-config",
|
||||
Flags: []string{"-proxy-id", "test-proxy"},
|
||||
XDSPorts: agent.GRPCPorts{Plaintext: 9999, TLS: 0},
|
||||
WantArgs: BootstrapTplArgs{
|
||||
ProxyCluster: "test-proxy",
|
||||
ProxyID: "test-proxy",
|
||||
|
@ -470,10 +470,36 @@ func TestGenerateConfig(t *testing.T) {
|
|||
PrometheusScrapePath: "/metrics",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "grpc-tls-addr-config",
|
||||
Flags: []string{"-proxy-id", "test-proxy"},
|
||||
XDSPorts: agent.GRPCPorts{Plaintext: 9997, TLS: 9998},
|
||||
AgentSelf110: false,
|
||||
WantArgs: BootstrapTplArgs{
|
||||
ProxyCluster: "test-proxy",
|
||||
ProxyID: "test-proxy",
|
||||
// We don't know this til after the lookup so it will be empty in the
|
||||
// initial args call we are testing here.
|
||||
ProxySourceService: "",
|
||||
// Should resolve IP, note this might not resolve the same way
|
||||
// everywhere which might make this test brittle but not sure what else
|
||||
// to do.
|
||||
GRPC: GRPC{
|
||||
AgentAddress: "127.0.0.1",
|
||||
AgentPort: "9998",
|
||||
AgentTLS: true,
|
||||
},
|
||||
AdminAccessLogPath: "/dev/null",
|
||||
AdminBindAddress: "127.0.0.1",
|
||||
AdminBindPort: "19000",
|
||||
LocalAgentClusterName: xds.LocalAgentClusterName,
|
||||
PrometheusScrapePath: "/metrics",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "deprecated-grpc-addr-config",
|
||||
Flags: []string{"-proxy-id", "test-proxy"},
|
||||
XDSPort: 9999,
|
||||
XDSPorts: agent.GRPCPorts{Plaintext: 9999, TLS: 0},
|
||||
AgentSelf110: true,
|
||||
WantArgs: BootstrapTplArgs{
|
||||
ProxyCluster: "test-proxy",
|
||||
|
@ -1138,7 +1164,7 @@ func testMockAgent(tc generateConfigTestCase) http.HandlerFunc {
|
|||
case strings.Contains(r.URL.Path, "/agent/service"):
|
||||
testMockAgentProxyConfig(tc.ProxyConfig, tc.NamespacesEnabled)(w, r)
|
||||
case strings.Contains(r.URL.Path, "/agent/self"):
|
||||
testMockAgentSelf(tc.XDSPort, tc.AgentSelf110)(w, r)
|
||||
testMockAgentSelf(tc.XDSPorts, tc.AgentSelf110)(w, r)
|
||||
case strings.Contains(r.URL.Path, "/catalog/node-services"):
|
||||
testMockCatalogNodeServiceList()(w, r)
|
||||
default:
|
||||
|
@ -1378,7 +1404,7 @@ func TestEnvoyCommand_canBindInternal(t *testing.T) {
|
|||
|
||||
// testMockAgentSelf returns an empty /v1/agent/self response except GRPC
|
||||
// port is filled in to match the given wantXDSPort argument.
|
||||
func testMockAgentSelf(wantXDSPort int, agentSelf110 bool) http.HandlerFunc {
|
||||
func testMockAgentSelf(wantXDSPorts agent.GRPCPorts, agentSelf110 bool) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
resp := agent.Self{
|
||||
Config: map[string]interface{}{
|
||||
|
@ -1388,10 +1414,17 @@ func testMockAgentSelf(wantXDSPort int, agentSelf110 bool) http.HandlerFunc {
|
|||
|
||||
if agentSelf110 {
|
||||
resp.DebugConfig = map[string]interface{}{
|
||||
"GRPCPort": wantXDSPort,
|
||||
"GRPCPort": wantXDSPorts.Plaintext,
|
||||
}
|
||||
} else {
|
||||
resp.XDS = &agent.XDSSelf{Port: wantXDSPort}
|
||||
resp.XDS = &agent.XDSSelf{
|
||||
// The deprecated Port field should default to TLS if it's available.
|
||||
Port: wantXDSPorts.TLS,
|
||||
Ports: wantXDSPorts,
|
||||
}
|
||||
if wantXDSPorts.TLS <= 0 {
|
||||
resp.XDS.Port = wantXDSPorts.Plaintext
|
||||
}
|
||||
}
|
||||
|
||||
selfJSON, err := json.Marshal(resp)
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
{
|
||||
"admin": {
|
||||
"access_log_path": "/dev/null",
|
||||
"address": {
|
||||
"socket_address": {
|
||||
"address": "127.0.0.1",
|
||||
"port_value": 19000
|
||||
}
|
||||
}
|
||||
},
|
||||
"node": {
|
||||
"cluster": "test",
|
||||
"id": "test-proxy",
|
||||
"metadata": {
|
||||
"namespace": "default",
|
||||
"partition": "default"
|
||||
}
|
||||
},
|
||||
"layered_runtime": {
|
||||
"layers": [
|
||||
{
|
||||
"name": "base",
|
||||
"static_layer": {
|
||||
"re2.max_program_size.error_level": 1048576
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"static_resources": {
|
||||
"clusters": [
|
||||
{
|
||||
"name": "local_agent",
|
||||
"ignore_health_on_host_removal": false,
|
||||
"connect_timeout": "1s",
|
||||
"type": "STATIC",
|
||||
"transport_socket": {
|
||||
"name": "tls",
|
||||
"typed_config": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
|
||||
"common_tls_context": {
|
||||
"validation_context": {
|
||||
"trusted_ca": {
|
||||
"inline_string": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"http2_protocol_options": {},
|
||||
"loadAssignment": {
|
||||
"clusterName": "local_agent",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socket_address": {
|
||||
"address": "127.0.0.1",
|
||||
"port_value": 9998
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"stats_config": {
|
||||
"stats_tags": [
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.custom_hash"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.service_subset"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.service"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.namespace"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:([^.]+)\\.)?[^.]+\\.internal[^.]*\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.partition"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.datacenter"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.([^.]+\\.(?:[^.]+\\.)?([^.]+)\\.external\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.peer"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.routing_type"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)",
|
||||
"tag_name": "consul.destination.trust_domain"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.destination.target"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)",
|
||||
"tag_name": "consul.destination.full_target"
|
||||
},
|
||||
{
|
||||
"regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.(([^.]+)(?:\\.[^.]+)?(?:\\.[^.]+)?\\.[^.]+\\.)",
|
||||
"tag_name": "consul.upstream.service"
|
||||
},
|
||||
{
|
||||
"regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.[^.]+)?\\.([^.]+)\\.)",
|
||||
"tag_name": "consul.upstream.datacenter"
|
||||
},
|
||||
{
|
||||
"regex": "^(?:tcp|http)\\.upstream_peered\\.([^.]+(?:\\.[^.]+)?\\.([^.]+)\\.)",
|
||||
"tag_name": "consul.upstream.peer"
|
||||
},
|
||||
{
|
||||
"regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.([^.]+(?:\\.([^.]+))?(?:\\.[^.]+)?\\.[^.]+\\.)",
|
||||
"tag_name": "consul.upstream.namespace"
|
||||
},
|
||||
{
|
||||
"regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.([^.]+))?\\.[^.]+\\.)",
|
||||
"tag_name": "consul.upstream.partition"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.custom_hash"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.service_subset"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.service"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.namespace"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.datacenter"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.routing_type"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)",
|
||||
"tag_name": "consul.trust_domain"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)",
|
||||
"tag_name": "consul.target"
|
||||
},
|
||||
{
|
||||
"regex": "^cluster\\.(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)",
|
||||
"tag_name": "consul.full_target"
|
||||
},
|
||||
{
|
||||
"tag_name": "local_cluster",
|
||||
"fixed_value": "test"
|
||||
},
|
||||
{
|
||||
"tag_name": "consul.source.service",
|
||||
"fixed_value": "test"
|
||||
},
|
||||
{
|
||||
"tag_name": "consul.source.namespace",
|
||||
"fixed_value": "default"
|
||||
},
|
||||
{
|
||||
"tag_name": "consul.source.partition",
|
||||
"fixed_value": "default"
|
||||
},
|
||||
{
|
||||
"tag_name": "consul.source.datacenter",
|
||||
"fixed_value": "dc1"
|
||||
}
|
||||
],
|
||||
"use_all_default_tags": true
|
||||
},
|
||||
"dynamic_resources": {
|
||||
"lds_config": {
|
||||
"ads": {},
|
||||
"resource_api_version": "V3"
|
||||
},
|
||||
"cds_config": {
|
||||
"ads": {},
|
||||
"resource_api_version": "V3"
|
||||
},
|
||||
"ads_config": {
|
||||
"api_type": "DELTA_GRPC",
|
||||
"transport_api_version": "V3",
|
||||
"grpc_services": {
|
||||
"initial_metadata": [
|
||||
{
|
||||
"key": "x-consul-token",
|
||||
"value": ""
|
||||
}
|
||||
],
|
||||
"envoy_grpc": {
|
||||
"cluster_name": "local_agent"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue