From 8b7d669a2731fbf9f9a8b8df8082a81be88d5f01 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Wed, 10 Jun 2020 16:15:32 -0400 Subject: [PATCH] Allow the Agent its its child Client/Server to share a connection pool This is needed so that we can make an AutoConfig RPC at the Agent level prior to creating the Client/Server. --- agent/agent.go | 59 +++++++++++++++++++++++++++++++++++++----- agent/consul/client.go | 30 ++++++++++++++------- agent/consul/server.go | 34 ++++++++++++++++++------ agent/pool/pool.go | 6 ++++- 4 files changed, 105 insertions(+), 24 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ffd92d6bd6..92854464f1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -33,6 +33,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/systemd" @@ -313,6 +314,9 @@ type Agent struct { // IP. httpConnLimiter connlimit.Limiter + // Connection Pool + connPool *pool.ConnPool + // enterpriseAgent embeds fields that we only access in consul-enterprise builds enterpriseAgent } @@ -327,6 +331,11 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error) return nil, fmt.Errorf("Must configure a DataDir") } + tlsConfigurator, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger) + if err != nil { + return nil, err + } + a := Agent{ config: c, checkReapAfter: make(map[structs.CheckID]time.Duration), @@ -347,7 +356,13 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error) endpoints: make(map[string]string), tokens: new(token.Store), logger: logger, + tlsConfigurator: tlsConfigurator, } + err = a.initializeConnectionPool() + if err != nil { + return nil, fmt.Errorf("Failed to initialize the connection pool: %w", err) + } + a.serviceManager = NewServiceManager(&a) if err := a.initializeACLs(); err != nil { @@ -363,6 +378,37 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error) return &a, nil } +func (a *Agent) initializeConnectionPool() error { + var rpcSrcAddr *net.TCPAddr + if !ipaddr.IsAny(a.config.RPCBindAddr) { + rpcSrcAddr = &net.TCPAddr{IP: a.config.RPCBindAddr.IP} + } + + // Ensure we have a log output for the connection pool. + logOutput := a.LogOutput + if logOutput == nil { + logOutput = os.Stderr + } + + pool := &pool.ConnPool{ + Server: a.config.ServerMode, + SrcAddr: rpcSrcAddr, + LogOutput: logOutput, + TLSConfigurator: a.tlsConfigurator, + Datacenter: a.config.Datacenter, + } + if a.config.ServerMode { + pool.MaxTime = 2 * time.Minute + pool.MaxStreams = 64 + } else { + pool.MaxTime = 127 * time.Second + pool.MaxStreams = 32 + } + + a.connPool = pool + return nil +} + // LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config func LocalConfig(cfg *config.RuntimeConfig) local.Config { lc := local.Config{ @@ -438,21 +484,22 @@ func (a *Agent) Start() error { return fmt.Errorf("failed to start Consul enterprise component: %v", err) } - tlsConfigurator, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), a.logger) - if err != nil { - return err + options := []consul.ConsulOption{ + consul.WithLogger(a.logger), + consul.WithTokenStore(a.tokens), + consul.WithTLSConfigurator(a.tlsConfigurator), + consul.WithConnectionPool(a.connPool), } - a.tlsConfigurator = tlsConfigurator // Setup either the client or the server. if c.ServerMode { - server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens, a.tlsConfigurator) + server, err := consul.NewServerWithOptions(consulCfg, options...) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } a.delegate = server } else { - client, err := consul.NewClientLogger(consulCfg, a.logger, a.tlsConfigurator) + client, err := consul.NewClientWithOptions(consulCfg, options...) if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } diff --git a/agent/consul/client.go b/agent/consul/client.go index 2d4eee61f2..f3af5af009 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -101,7 +101,13 @@ func NewClient(config *Config) (*Client, error) { return NewClientLogger(config, nil, c) } -func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurator *tlsutil.Configurator) (*Client, error) { +func NewClientWithOptions(config *Config, options ...ConsulOption) (*Client, error) { + flat := flattenConsulOptions(options) + + logger := flat.logger + tlsConfigurator := flat.tlsConfigurator + connPool := flat.connPool + // Check the protocol version if err := config.CheckProtocolVersion(); err != nil { return nil, err @@ -130,14 +136,16 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat }) } - connPool := &pool.ConnPool{ - Server: false, - SrcAddr: config.RPCSrcAddr, - LogOutput: config.LogOutput, - MaxTime: clientRPCConnMaxIdle, - MaxStreams: clientMaxStreams, - TLSConfigurator: tlsConfigurator, - Datacenter: config.Datacenter, + if connPool == nil { + connPool = &pool.ConnPool{ + Server: false, + SrcAddr: config.RPCSrcAddr, + LogOutput: config.LogOutput, + MaxTime: clientRPCConnMaxIdle, + MaxStreams: clientMaxStreams, + TLSConfigurator: tlsConfigurator, + Datacenter: config.Datacenter, + } } // Create client @@ -202,6 +210,10 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat return c, nil } +func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurator *tlsutil.Configurator) (*Client, error) { + return NewClientWithOptions(config, WithLogger(logger), WithTLSConfigurator(tlsConfigurator)) +} + // Shutdown is used to shutdown the client func (c *Client) Shutdown() error { c.logger.Info("shutting down client") diff --git a/agent/consul/server.go b/agent/consul/server.go index dee3ca0e89..6bef3d130d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -322,6 +322,22 @@ func NewServer(config *Config) (*Server, error) { // NewServerLogger is used to construct a new Consul server from the // configuration, potentially returning an error func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token.Store, tlsConfigurator *tlsutil.Configurator) (*Server, error) { + return NewServerWithOptions(config, + WithLogger(logger), + WithTokenStore(tokens), + WithTLSConfigurator(tlsConfigurator)) +} + +// NewServerWithOptions is used to construct a new Consul server from the configuration +// and extra options, potentially returning an error +func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, error) { + flat := flattenConsulOptions(options) + + logger := flat.logger + tokens := flat.tokens + tlsConfigurator := flat.tlsConfigurator + connPool := flat.connPool + // Check the protocol version. if err := config.CheckProtocolVersion(); err != nil { return nil, err @@ -376,14 +392,16 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token // Create the shutdown channel - this is closed but never written to. shutdownCh := make(chan struct{}) - connPool := &pool.ConnPool{ - Server: true, - SrcAddr: config.RPCSrcAddr, - LogOutput: config.LogOutput, - MaxTime: serverRPCCache, - MaxStreams: serverMaxStreams, - TLSConfigurator: tlsConfigurator, - Datacenter: config.Datacenter, + if connPool == nil { + connPool = &pool.ConnPool{ + Server: true, + SrcAddr: config.RPCSrcAddr, + LogOutput: config.LogOutput, + MaxTime: serverRPCCache, + MaxStreams: serverMaxStreams, + TLSConfigurator: tlsConfigurator, + Datacenter: config.Datacenter, + } } serverLogger := logger.NamedIntercept(logging.ConsulServer) diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 7f14faa38e..487e798086 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -466,7 +466,11 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr) (*Conn, conf.LogOutput = p.LogOutput // Create a multiplexed session - session, _ := yamux.Client(conn, conf) + session, err := yamux.Client(conn, conf) + if err != nil { + conn.Close() + return nil, fmt.Errorf("Failed to create yamux client: %w", err) + } // Wrap the connection c := &Conn{