diff --git a/agent/agent.go b/agent/agent.go index 9c63eb06b3..d1927f2d9e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -18,7 +18,6 @@ import ( "time" "github.com/hashicorp/consul/agent/dns" - "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" @@ -29,14 +28,12 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/ae" - autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/local" - "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/systemd" @@ -156,7 +153,8 @@ type notifier interface { // mode, it runs a full Consul server. In client-only mode, it only forwards // requests to other Consul servers. type Agent struct { - autoConf *autoconf.AutoConfig + // TODO: remove fields that are already in BaseDeps + baseDeps BaseDeps // config is the agent configuration. config *config.RuntimeConfig @@ -164,9 +162,6 @@ type Agent struct { // Used for writing our logs logger hclog.InterceptLogger - // In-memory sink used for collecting metrics - MemSink MetricsHandler - // delegate is either a *consul.Server or *consul.Client // depending on the configuration delegate delegate @@ -295,12 +290,6 @@ type Agent struct { // IP. httpConnLimiter connlimit.Limiter - // Connection Pool - connPool *pool.ConnPool - - // Shared RPC Router - router *router.Router - // enterpriseAgent embeds fields that we only access in consul-enterprise builds enterpriseAgent } @@ -337,16 +326,12 @@ func New(bd BaseDeps) (*Agent, error) { shutdownCh: make(chan struct{}), endpoints: make(map[string]string), - // TODO: store the BaseDeps instead of copying them over to Agent + baseDeps: bd, tokens: bd.Tokens, logger: bd.Logger, tlsConfigurator: bd.TLSConfigurator, config: bd.RuntimeConfig, cache: bd.Cache, - MemSink: bd.MetricsHandler, - connPool: bd.ConnPool, - autoConf: bd.AutoConfig, - router: bd.Router, } a.serviceManager = NewServiceManager(&a) @@ -407,7 +392,7 @@ func (a *Agent) Start(ctx context.Context) error { // This needs to be done early on as it will potentially alter the configuration // and then how other bits are brought up - c, err := a.autoConf.InitialConfiguration(ctx) + c, err := a.baseDeps.AutoConfig.InitialConfiguration(ctx) if err != nil { return err } @@ -454,23 +439,15 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("failed to start Consul enterprise component: %v", err) } - options := []consul.ConsulOption{ - consul.WithLogger(a.logger), - consul.WithTokenStore(a.tokens), - consul.WithTLSConfigurator(a.tlsConfigurator), - consul.WithConnectionPool(a.connPool), - consul.WithRouter(a.router), - } - // Setup either the client or the server. if c.ServerMode { - server, err := consul.NewServer(consulCfg, options...) + server, err := consul.NewServer(consulCfg, a.baseDeps.Deps) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } a.delegate = server } else { - client, err := consul.NewClient(consulCfg, options...) + client, err := consul.NewClient(consulCfg, a.baseDeps.Deps) if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } @@ -487,7 +464,7 @@ func (a *Agent) Start(ctx context.Context) error { a.State.Delegate = a.delegate a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger - if err := a.autoConf.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { + if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err) } a.serviceManager.Start() @@ -1297,7 +1274,7 @@ func (a *Agent) ShutdownAgent() error { // this would be cancelled anyways (by the closing of the shutdown ch) but // this should help them to be stopped more quickly - a.autoConf.Stop() + a.baseDeps.AutoConfig.Stop() // Stop the service manager (must happen before we take the stateLock to avoid deadlock) if a.serviceManager != nil { @@ -3472,7 +3449,7 @@ func (a *Agent) loadLimits(conf *config.RuntimeConfig) { // all services, checks, tokens, metadata, dnsServer configs, etc. // It will also reload all ongoing watches. func (a *Agent) ReloadConfig() error { - newCfg, err := a.autoConf.ReadConfig() + newCfg, err := a.baseDeps.AutoConfig.ReadConfig() if err != nil { return err } diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 1457d5093a..0ff5dafbc8 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -152,7 +152,7 @@ func (s *HTTPServer) AgentMetrics(resp http.ResponseWriter, req *http.Request) ( handler.ServeHTTP(resp, req) return nil, nil } - return s.agent.MemSink.DisplayMetrics(resp, req) + return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) } func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/agent/agent_test.go b/agent/agent_test.go index dded499ee2..3b67e4009f 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4607,7 +4607,7 @@ func TestSharedRPCRouter(t *testing.T) { testrpc.WaitForTestAgent(t, srv.RPC, "dc1") - mgr, server := srv.Agent.router.FindLANRoute() + mgr, server := srv.Agent.baseDeps.Router.FindLANRoute() require.NotNil(t, mgr) require.NotNil(t, server) @@ -4619,7 +4619,7 @@ func TestSharedRPCRouter(t *testing.T) { testrpc.WaitForTestAgent(t, client.RPC, "dc1") - mgr, server = client.Agent.router.FindLANRoute() + mgr, server = client.Agent.baseDeps.Router.FindLANRoute() require.NotNil(t, mgr) require.NotNil(t, server) } diff --git a/agent/consul/client.go b/agent/consul/client.go index c2e0806379..b4cf90759b 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -22,18 +22,6 @@ import ( ) const ( - // clientRPCConnMaxIdle controls how long we keep an idle connection - // open to a server. 127s was chosen as the first prime above 120s - // (arbitrarily chose to use a prime) with the intent of reusing - // connections who are used by once-a-minute cron(8) jobs *and* who - // use a 60s jitter window (e.g. in vixie cron job execution can - // drift by up to 59s per job, or 119s for a once-a-minute cron job). - clientRPCConnMaxIdle = 127 * time.Second - - // clientMaxStreams controls how many idle streams we keep - // open to a server - clientMaxStreams = 32 - // serfEventBacklog is the maximum number of unprocessed Serf Events // that will be held in queue before new serf events block. A // blocking serf event queue is a bad thing. @@ -89,12 +77,7 @@ type Client struct { } // NewClient creates and returns a Client -func NewClient(config *Config, options ...ConsulOption) (*Client, error) { - flat := flattenConsulOptions(options) - - tlsConfigurator := flat.tlsConfigurator - connPool := flat.connPool - +func NewClient(config *Config, deps Deps) (*Client, error) { if err := config.CheckProtocolVersion(); err != nil { return nil, err } @@ -104,35 +87,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) { if err := config.CheckACL(); err != nil { return nil, err } - if flat.logger == nil { - return nil, fmt.Errorf("logger is required") - } - if flat.router == nil { - return nil, fmt.Errorf("router is required") - } - if connPool == nil { - connPool = &pool.ConnPool{ - Server: false, - SrcAddr: config.RPCSrcAddr, - Logger: flat.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), - MaxTime: clientRPCConnMaxIdle, - MaxStreams: clientMaxStreams, - TLSConfigurator: tlsConfigurator, - Datacenter: config.Datacenter, - } - } - - logger := flat.logger.NamedIntercept(logging.ConsulClient) - - // Create client c := &Client{ config: config, - connPool: connPool, + connPool: deps.ConnPool, eventCh: make(chan serf.Event, serfEventBacklog), - logger: logger, + logger: deps.Logger.NamedIntercept(logging.ConsulClient), shutdownCh: make(chan struct{}), - tlsConfigurator: tlsConfigurator, + tlsConfigurator: deps.TLSConfigurator, } c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) @@ -164,11 +126,11 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) { return nil, fmt.Errorf("Failed to start lan serf: %v", err) } - if err := flat.router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil { + if err := deps.Router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil { c.Shutdown() return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err) } - c.router = flat.router + c.router = deps.Router // Start LAN event handlers after the router is complete since the event // handlers depend on the router and the router depends on Serf. diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index cbbb6c6393..ea0250454f 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -9,8 +9,10 @@ import ( "testing" "time" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -66,22 +68,8 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli if cb != nil { cb(config) } - logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ - Name: config.NodeName, - Level: hclog.Debug, - Output: testutil.NewLogBuffer(t), - }) - tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger) - if err != nil { - t.Fatalf("err: %v", err) - } - - r := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter)) - client, err := NewClient(config, - WithLogger(logger), - WithTLSConfigurator(tlsConf), - WithRouter(r)) + client, err := NewClient(config, newDefaultDeps(t, config)) return dir, client, err } @@ -472,18 +460,7 @@ func TestClient_RPC_TLS(t *testing.T) { func newClient(t *testing.T, config *Config) *Client { t.Helper() - c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil) - require.NoError(t, err, "failed to create tls configuration") - - logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ - Level: hclog.Debug, - Output: testutil.NewLogBuffer(t), - }) - r := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter)) - client, err := NewClient(config, - WithLogger(logger), - WithTLSConfigurator(c), - WithRouter(r)) + client, err := NewClient(config, newDefaultDeps(t, config)) require.NoError(t, err, "failed to create client") t.Cleanup(func() { client.Shutdown() @@ -491,6 +468,39 @@ func newClient(t *testing.T, config *Config) *Client { return client } +func newDefaultDeps(t *testing.T, c *Config) Deps { + t.Helper() + + logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ + Name: c.NodeName, + Level: hclog.Debug, + Output: testutil.NewLogBuffer(t), + }) + + tls, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger) + require.NoError(t, err, "failed to create tls configuration") + + r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter)) + + connPool := &pool.ConnPool{ + Server: false, + SrcAddr: c.RPCSrcAddr, + Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), + MaxTime: 2 * time.Minute, + MaxStreams: 4, + TLSConfigurator: tls, + Datacenter: c.Datacenter, + } + + return Deps{ + Logger: logger, + TLSConfigurator: tls, + Tokens: new(token.Store), + Router: r, + ConnPool: connPool, + } +} + func TestClient_RPC_RateLimit(t *testing.T) { t.Parallel() _, conf1 := testServerConfig(t) diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 2bd30e3350..408664696f 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -9,14 +9,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/go-hclog" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/serf" @@ -1304,15 +1301,11 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) { Level: hclog.Debug, Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)), }) - tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger) - require.NoError(t, err) - rpcRouter := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter)) - srv, err := NewServer(config, - WithLogger(logger), - WithTokenStore(new(token.Store)), - WithTLSConfigurator(tlsConf), - WithRouter(rpcRouter)) + deps := newDefaultDeps(t, config) + deps.Logger = logger + + srv, err := NewServer(config, deps) require.NoError(t, err) defer srv.Shutdown() diff --git a/agent/consul/options.go b/agent/consul/options.go index 2650780761..242da3d35f 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -8,50 +8,10 @@ import ( "github.com/hashicorp/go-hclog" ) -type consulOptions struct { - logger hclog.InterceptLogger - tlsConfigurator *tlsutil.Configurator - connPool *pool.ConnPool - tokens *token.Store - router *router.Router -} - -type ConsulOption func(*consulOptions) - -func WithLogger(logger hclog.InterceptLogger) ConsulOption { - return func(opt *consulOptions) { - opt.logger = logger - } -} - -func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) ConsulOption { - return func(opt *consulOptions) { - opt.tlsConfigurator = tlsConfigurator - } -} - -func WithConnectionPool(connPool *pool.ConnPool) ConsulOption { - return func(opt *consulOptions) { - opt.connPool = connPool - } -} - -func WithTokenStore(tokens *token.Store) ConsulOption { - return func(opt *consulOptions) { - opt.tokens = tokens - } -} - -func WithRouter(router *router.Router) ConsulOption { - return func(opt *consulOptions) { - opt.router = router - } -} - -func flattenConsulOptions(options []ConsulOption) consulOptions { - var flat consulOptions - for _, opt := range options { - opt(&flat) - } - return flat +type Deps struct { + Logger hclog.InterceptLogger + TLSConfigurator *tlsutil.Configurator + Tokens *token.Store + Router *router.Router + ConnPool *pool.ConnPool } diff --git a/agent/consul/server.go b/agent/consul/server.go index 7aacdfb9a4..85393ac1ca 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -70,14 +70,6 @@ const ( raftState = "raft/" snapshotsRetained = 2 - // serverRPCCache controls how long we keep an idle connection - // open to a server - serverRPCCache = 2 * time.Minute - - // serverMaxStreams controls how many idle streams we keep - // open to a server - serverMaxStreams = 64 - // raftLogCacheSize is the maximum number of logs to cache in-memory. // This is used to reduce disk I/O for the recently committed entries. raftLogCacheSize = 512 @@ -324,14 +316,8 @@ type connHandler interface { // NewServer is used to construct a new Consul server from the configuration // and extra options, potentially returning an error. -func NewServer(config *Config, options ...ConsulOption) (*Server, error) { - flat := flattenConsulOptions(options) - - logger := flat.logger - tokens := flat.tokens - tlsConfigurator := flat.tlsConfigurator - connPool := flat.connPool - +func NewServer(config *Config, flat Deps) (*Server, error) { + logger := flat.Logger if err := config.CheckProtocolVersion(); err != nil { return nil, err } @@ -341,12 +327,6 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { if err := config.CheckACL(); err != nil { return nil, err } - if logger == nil { - return nil, fmt.Errorf("logger is required") - } - if flat.router == nil { - return nil, fmt.Errorf("router is required") - } // Check if TLS is enabled if config.CAFile != "" || config.CAPath != "" { @@ -375,36 +355,24 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { // Create the shutdown channel - this is closed but never written to. shutdownCh := make(chan struct{}) - if connPool == nil { - connPool = &pool.ConnPool{ - Server: true, - SrcAddr: config.RPCSrcAddr, - Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), - MaxTime: serverRPCCache, - MaxStreams: serverMaxStreams, - TLSConfigurator: tlsConfigurator, - Datacenter: config.Datacenter, - } - } - - serverLogger := logger.NamedIntercept(logging.ConsulServer) + serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer) loggers := newLoggerStore(serverLogger) // Create server. s := &Server{ config: config, - tokens: tokens, - connPool: connPool, + tokens: flat.Tokens, + connPool: flat.ConnPool, eventChLAN: make(chan serf.Event, serfEventChSize), eventChWAN: make(chan serf.Event, serfEventChSize), logger: serverLogger, loggers: loggers, leaveCh: make(chan struct{}), reconcileCh: make(chan serf.Member, reconcileChSize), - router: flat.router, + router: flat.Router, rpcServer: rpc.NewServer(), insecureRPCServer: rpc.NewServer(), - tlsConfigurator: tlsConfigurator, + tlsConfigurator: flat.TLSConfigurator, reassertLeaderCh: make(chan chan error), segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), sessionTimers: NewSessionTimers(), diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 00a7e4ea13..928e59257b 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -17,7 +17,6 @@ import ( "github.com/google/tcpproxy" "github.com/hashicorp/consul/agent/connect/ca" - "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/memberlist" @@ -31,7 +30,6 @@ import ( "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "golang.org/x/time/rate" @@ -293,22 +291,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) { } } - logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ - Name: c.NodeName, - Level: hclog.Debug, - Output: testutil.NewLogBuffer(t), - }) - tlsConf, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger) - if err != nil { - return nil, err - } - - rpcRouter := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter)) - srv, err := NewServer(c, - WithLogger(logger), - WithTokenStore(new(token.Store)), - WithTLSConfigurator(tlsConf), - WithRouter(rpcRouter)) + srv, err := NewServer(c, newDefaultDeps(t, c)) if err != nil { return nil, err } @@ -1492,19 +1475,11 @@ func TestServer_CALogging(t *testing.T) { var buf bytes.Buffer logger := testutil.LoggerWithOutput(t, &buf) - c, err := tlsutil.NewConfigurator(conf1.ToTLSUtilConfig(), logger) + deps := newDefaultDeps(t, conf1) + deps.Logger = logger + + s1, err := NewServer(conf1, deps) require.NoError(t, err) - - rpcRouter := router.NewRouter(logger, "dc1", fmt.Sprintf("%s.%s", "nodename", "dc1")) - - s1, err := NewServer(conf1, - WithLogger(logger), - WithTokenStore(new(token.Store)), - WithTLSConfigurator(c), - WithRouter(rpcRouter)) - if err != nil { - t.Fatalf("err: %v", err) - } defer s1.Shutdown() testrpc.WaitForLeader(t, s1.RPC, "dc1") diff --git a/agent/setup.go b/agent/setup.go index d5a2d063ea..18a0be0c38 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -10,6 +10,7 @@ import ( autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/config" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/token" @@ -25,15 +26,12 @@ import ( // has been moved out in front of Agent.New, and we can better see the setup // dependencies. type BaseDeps struct { - Logger hclog.InterceptLogger - TLSConfigurator *tlsutil.Configurator // TODO: use an interface - MetricsHandler MetricsHandler - RuntimeConfig *config.RuntimeConfig - Tokens *token.Store - Cache *cache.Cache - AutoConfig *autoconf.AutoConfig // TODO: use an interface - ConnPool *pool.ConnPool // TODO: use an interface - Router *router.Router + consul.Deps // TODO: un-embed + + RuntimeConfig *config.RuntimeConfig + MetricsHandler MetricsHandler + AutoConfig *autoconf.AutoConfig // TODO: use an interface + Cache *cache.Cache } // MetricsHandler provides an http.Handler for displaying metrics. @@ -120,6 +118,12 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil pool.MaxTime = 2 * time.Minute pool.MaxStreams = 64 } else { + // MaxTime controls how long we keep an idle connection open to a server. + // 127s was chosen as the first prime above 120s + // (arbitrarily chose to use a prime) with the intent of reusing + // connections who are used by once-a-minute cron(8) jobs *and* who + // use a 60s jitter window (e.g. in vixie cron job execution can + // drift by up to 59s per job, or 119s for a once-a-minute cron job). pool.MaxTime = 127 * time.Second pool.MaxStreams = 32 }