agent/consul: pass dependencies directly from agent

In an upcoming change we will need to pass a grpc.ClientConnPool from
BaseDeps into Server. While looking at that change I noticed all of the
existing consulOption fields are already on BaseDeps.

Instead of duplicating the fields, we can create a struct used by
agent/consul, and use that struct in BaseDeps. This allows us to pass
along dependencies without translating them into different
representations.

I also looked at moving all of BaseDeps in agent/consul, however that
created some circular imports. Resolving those cycles wouldn't be too
bad (it was only an error in agent/consul being imported from
cache-types), however this change seems a little better by starting to
introduce some structure to BaseDeps.

This change is also a small step in reducing the scope of Agent.

Also remove some constants that were only used by tests, and move the
relevant comment to where the live configuration is set.

Removed some validation from NewServer and NewClient, as these are not
really runtime errors. They would be code errors, which will cause a
panic anyway, so no reason to handle them specially here.
This commit is contained in:
Daniel Nephin 2020-09-14 18:31:07 -04:00
parent 3aa9bd4c23
commit cdd392d77f
10 changed files with 90 additions and 241 deletions

View File

@ -18,7 +18,6 @@ import (
"time"
"github.com/hashicorp/consul/agent/dns"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
@ -29,14 +28,12 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
@ -156,7 +153,8 @@ type notifier interface {
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
autoConf *autoconf.AutoConfig
// TODO: remove fields that are already in BaseDeps
baseDeps BaseDeps
// config is the agent configuration.
config *config.RuntimeConfig
@ -164,9 +162,6 @@ type Agent struct {
// Used for writing our logs
logger hclog.InterceptLogger
// In-memory sink used for collecting metrics
MemSink MetricsHandler
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
@ -295,12 +290,6 @@ type Agent struct {
// IP.
httpConnLimiter connlimit.Limiter
// Connection Pool
connPool *pool.ConnPool
// Shared RPC Router
router *router.Router
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
}
@ -337,16 +326,12 @@ func New(bd BaseDeps) (*Agent, error) {
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
// TODO: store the BaseDeps instead of copying them over to Agent
baseDeps: bd,
tokens: bd.Tokens,
logger: bd.Logger,
tlsConfigurator: bd.TLSConfigurator,
config: bd.RuntimeConfig,
cache: bd.Cache,
MemSink: bd.MetricsHandler,
connPool: bd.ConnPool,
autoConf: bd.AutoConfig,
router: bd.Router,
}
a.serviceManager = NewServiceManager(&a)
@ -407,7 +392,7 @@ func (a *Agent) Start(ctx context.Context) error {
// This needs to be done early on as it will potentially alter the configuration
// and then how other bits are brought up
c, err := a.autoConf.InitialConfiguration(ctx)
c, err := a.baseDeps.AutoConfig.InitialConfiguration(ctx)
if err != nil {
return err
}
@ -454,23 +439,15 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}
options := []consul.ConsulOption{
consul.WithLogger(a.logger),
consul.WithTokenStore(a.tokens),
consul.WithTLSConfigurator(a.tlsConfigurator),
consul.WithConnectionPool(a.connPool),
consul.WithRouter(a.router),
}
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, options...)
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
} else {
client, err := consul.NewClient(consulCfg, options...)
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
@ -487,7 +464,7 @@ func (a *Agent) Start(ctx context.Context) error {
a.State.Delegate = a.delegate
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
if err := a.autoConf.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
}
a.serviceManager.Start()
@ -1297,7 +1274,7 @@ func (a *Agent) ShutdownAgent() error {
// this would be cancelled anyways (by the closing of the shutdown ch) but
// this should help them to be stopped more quickly
a.autoConf.Stop()
a.baseDeps.AutoConfig.Stop()
// Stop the service manager (must happen before we take the stateLock to avoid deadlock)
if a.serviceManager != nil {
@ -3472,7 +3449,7 @@ func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
// all services, checks, tokens, metadata, dnsServer configs, etc.
// It will also reload all ongoing watches.
func (a *Agent) ReloadConfig() error {
newCfg, err := a.autoConf.ReadConfig()
newCfg, err := a.baseDeps.AutoConfig.ReadConfig()
if err != nil {
return err
}

View File

@ -152,7 +152,7 @@ func (s *HTTPServer) AgentMetrics(resp http.ResponseWriter, req *http.Request) (
handler.ServeHTTP(resp, req)
return nil, nil
}
return s.agent.MemSink.DisplayMetrics(resp, req)
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
}
func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@ -4607,7 +4607,7 @@ func TestSharedRPCRouter(t *testing.T) {
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
mgr, server := srv.Agent.router.FindLANRoute()
mgr, server := srv.Agent.baseDeps.Router.FindLANRoute()
require.NotNil(t, mgr)
require.NotNil(t, server)
@ -4619,7 +4619,7 @@ func TestSharedRPCRouter(t *testing.T) {
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
mgr, server = client.Agent.router.FindLANRoute()
mgr, server = client.Agent.baseDeps.Router.FindLANRoute()
require.NotNil(t, mgr)
require.NotNil(t, server)
}

View File

@ -22,18 +22,6 @@ import (
)
const (
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
// clientMaxStreams controls how many idle streams we keep
// open to a server
clientMaxStreams = 32
// serfEventBacklog is the maximum number of unprocessed Serf Events
// that will be held in queue before new serf events block. A
// blocking serf event queue is a bad thing.
@ -89,12 +77,7 @@ type Client struct {
}
// NewClient creates and returns a Client
func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
flat := flattenConsulOptions(options)
tlsConfigurator := flat.tlsConfigurator
connPool := flat.connPool
func NewClient(config *Config, deps Deps) (*Client, error) {
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
}
@ -104,35 +87,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
if err := config.CheckACL(); err != nil {
return nil, err
}
if flat.logger == nil {
return nil, fmt.Errorf("logger is required")
}
if flat.router == nil {
return nil, fmt.Errorf("router is required")
}
if connPool == nil {
connPool = &pool.ConnPool{
Server: false,
SrcAddr: config.RPCSrcAddr,
Logger: flat.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
}
}
logger := flat.logger.NamedIntercept(logging.ConsulClient)
// Create client
c := &Client{
config: config,
connPool: connPool,
connPool: deps.ConnPool,
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
logger: deps.Logger.NamedIntercept(logging.ConsulClient),
shutdownCh: make(chan struct{}),
tlsConfigurator: tlsConfigurator,
tlsConfigurator: deps.TLSConfigurator,
}
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
@ -164,11 +126,11 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
}
if err := flat.router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
if err := deps.Router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
}
c.router = flat.router
c.router = deps.Router
// Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf.

View File

@ -9,8 +9,10 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
@ -66,22 +68,8 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
if cb != nil {
cb(config)
}
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: config.NodeName,
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
if err != nil {
t.Fatalf("err: %v", err)
}
r := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
client, err := NewClient(config,
WithLogger(logger),
WithTLSConfigurator(tlsConf),
WithRouter(r))
client, err := NewClient(config, newDefaultDeps(t, config))
return dir, client, err
}
@ -472,18 +460,7 @@ func TestClient_RPC_TLS(t *testing.T) {
func newClient(t *testing.T, config *Config) *Client {
t.Helper()
c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil)
require.NoError(t, err, "failed to create tls configuration")
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
r := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
client, err := NewClient(config,
WithLogger(logger),
WithTLSConfigurator(c),
WithRouter(r))
client, err := NewClient(config, newDefaultDeps(t, config))
require.NoError(t, err, "failed to create client")
t.Cleanup(func() {
client.Shutdown()
@ -491,6 +468,39 @@ func newClient(t *testing.T, config *Config) *Client {
return client
}
func newDefaultDeps(t *testing.T, c *Config) Deps {
t.Helper()
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: c.NodeName,
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
tls, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
require.NoError(t, err, "failed to create tls configuration")
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
connPool := &pool.ConnPool{
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
}
return Deps{
Logger: logger,
TLSConfigurator: tls,
Tokens: new(token.Store),
Router: r,
ConnPool: connPool,
}
}
func TestClient_RPC_RateLimit(t *testing.T) {
t.Parallel()
_, conf1 := testServerConfig(t)

View File

@ -9,14 +9,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
@ -1304,15 +1301,11 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
Level: hclog.Debug,
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
})
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
require.NoError(t, err)
rpcRouter := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
srv, err := NewServer(config,
WithLogger(logger),
WithTokenStore(new(token.Store)),
WithTLSConfigurator(tlsConf),
WithRouter(rpcRouter))
deps := newDefaultDeps(t, config)
deps.Logger = logger
srv, err := NewServer(config, deps)
require.NoError(t, err)
defer srv.Shutdown()

View File

@ -8,50 +8,10 @@ import (
"github.com/hashicorp/go-hclog"
)
type consulOptions struct {
logger hclog.InterceptLogger
tlsConfigurator *tlsutil.Configurator
connPool *pool.ConnPool
tokens *token.Store
router *router.Router
}
type ConsulOption func(*consulOptions)
func WithLogger(logger hclog.InterceptLogger) ConsulOption {
return func(opt *consulOptions) {
opt.logger = logger
}
}
func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) ConsulOption {
return func(opt *consulOptions) {
opt.tlsConfigurator = tlsConfigurator
}
}
func WithConnectionPool(connPool *pool.ConnPool) ConsulOption {
return func(opt *consulOptions) {
opt.connPool = connPool
}
}
func WithTokenStore(tokens *token.Store) ConsulOption {
return func(opt *consulOptions) {
opt.tokens = tokens
}
}
func WithRouter(router *router.Router) ConsulOption {
return func(opt *consulOptions) {
opt.router = router
}
}
func flattenConsulOptions(options []ConsulOption) consulOptions {
var flat consulOptions
for _, opt := range options {
opt(&flat)
}
return flat
type Deps struct {
Logger hclog.InterceptLogger
TLSConfigurator *tlsutil.Configurator
Tokens *token.Store
Router *router.Router
ConnPool *pool.ConnPool
}

View File

@ -70,14 +70,6 @@ const (
raftState = "raft/"
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection
// open to a server
serverRPCCache = 2 * time.Minute
// serverMaxStreams controls how many idle streams we keep
// open to a server
serverMaxStreams = 64
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
@ -324,14 +316,8 @@ type connHandler interface {
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
flat := flattenConsulOptions(options)
logger := flat.logger
tokens := flat.tokens
tlsConfigurator := flat.tlsConfigurator
connPool := flat.connPool
func NewServer(config *Config, flat Deps) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
}
@ -341,12 +327,6 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
if err := config.CheckACL(); err != nil {
return nil, err
}
if logger == nil {
return nil, fmt.Errorf("logger is required")
}
if flat.router == nil {
return nil, fmt.Errorf("router is required")
}
// Check if TLS is enabled
if config.CAFile != "" || config.CAPath != "" {
@ -375,36 +355,24 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make(chan struct{})
if connPool == nil {
connPool = &pool.ConnPool{
Server: true,
SrcAddr: config.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
}
}
serverLogger := logger.NamedIntercept(logging.ConsulServer)
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
loggers := newLoggerStore(serverLogger)
// Create server.
s := &Server{
config: config,
tokens: tokens,
connPool: connPool,
tokens: flat.Tokens,
connPool: flat.ConnPool,
eventChLAN: make(chan serf.Event, serfEventChSize),
eventChWAN: make(chan serf.Event, serfEventChSize),
logger: serverLogger,
loggers: loggers,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, reconcileChSize),
router: flat.router,
router: flat.Router,
rpcServer: rpc.NewServer(),
insecureRPCServer: rpc.NewServer(),
tlsConfigurator: tlsConfigurator,
tlsConfigurator: flat.TLSConfigurator,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),

View File

@ -17,7 +17,6 @@ import (
"github.com/google/tcpproxy"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/memberlist"
@ -31,7 +30,6 @@ import (
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
@ -293,22 +291,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
}
}
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: c.NodeName,
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
tlsConf, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
if err != nil {
return nil, err
}
rpcRouter := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
srv, err := NewServer(c,
WithLogger(logger),
WithTokenStore(new(token.Store)),
WithTLSConfigurator(tlsConf),
WithRouter(rpcRouter))
srv, err := NewServer(c, newDefaultDeps(t, c))
if err != nil {
return nil, err
}
@ -1492,19 +1475,11 @@ func TestServer_CALogging(t *testing.T) {
var buf bytes.Buffer
logger := testutil.LoggerWithOutput(t, &buf)
c, err := tlsutil.NewConfigurator(conf1.ToTLSUtilConfig(), logger)
deps := newDefaultDeps(t, conf1)
deps.Logger = logger
s1, err := NewServer(conf1, deps)
require.NoError(t, err)
rpcRouter := router.NewRouter(logger, "dc1", fmt.Sprintf("%s.%s", "nodename", "dc1"))
s1, err := NewServer(conf1,
WithLogger(logger),
WithTokenStore(new(token.Store)),
WithTLSConfigurator(c),
WithRouter(rpcRouter))
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -10,6 +10,7 @@ import (
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
@ -25,15 +26,12 @@ import (
// has been moved out in front of Agent.New, and we can better see the setup
// dependencies.
type BaseDeps struct {
Logger hclog.InterceptLogger
TLSConfigurator *tlsutil.Configurator // TODO: use an interface
MetricsHandler MetricsHandler
RuntimeConfig *config.RuntimeConfig
Tokens *token.Store
Cache *cache.Cache
AutoConfig *autoconf.AutoConfig // TODO: use an interface
ConnPool *pool.ConnPool // TODO: use an interface
Router *router.Router
consul.Deps // TODO: un-embed
RuntimeConfig *config.RuntimeConfig
MetricsHandler MetricsHandler
AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache
}
// MetricsHandler provides an http.Handler for displaying metrics.
@ -120,6 +118,12 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
pool.MaxTime = 2 * time.Minute
pool.MaxStreams = 64
} else {
// MaxTime controls how long we keep an idle connection open to a server.
// 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
pool.MaxTime = 127 * time.Second
pool.MaxStreams = 32
}