mirror of https://github.com/status-im/consul.git
agent: extract dependency creation from New
With this change, Agent.New() accepts many of the dependencies instead of creating them in New. Accepting fully constructed dependencies from a constructor makes the type easier to test, and easier to change. There are still a number of dependencies created in Start() which can be addressed in a follow up.
This commit is contained in:
parent
51b08c645b
commit
5d4df54296
11
agent/acl.go
11
agent/acl.go
|
@ -59,7 +59,7 @@ func (a *Agent) aclAccessorID(secretID string) string {
|
||||||
return ident.ID()
|
return ident.ID()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) initializeACLs() error {
|
func initializeACLs(nodeName string) (acl.Authorizer, error) {
|
||||||
// Build a policy for the agent master token.
|
// Build a policy for the agent master token.
|
||||||
// The builtin agent master policy allows reading any node information
|
// The builtin agent master policy allows reading any node information
|
||||||
// and allows writes to the agent with the node name of the running agent
|
// and allows writes to the agent with the node name of the running agent
|
||||||
|
@ -69,7 +69,7 @@ func (a *Agent) initializeACLs() error {
|
||||||
PolicyRules: acl.PolicyRules{
|
PolicyRules: acl.PolicyRules{
|
||||||
Agents: []*acl.AgentRule{
|
Agents: []*acl.AgentRule{
|
||||||
{
|
{
|
||||||
Node: a.config.NodeName,
|
Node: nodeName,
|
||||||
Policy: acl.PolicyWrite,
|
Policy: acl.PolicyWrite,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -81,12 +81,7 @@ func (a *Agent) initializeACLs() error {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
master, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
|
return acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
a.aclMasterAuthorizer = master
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// vetServiceRegister makes sure the service registration action is allowed by
|
// vetServiceRegister makes sure the service registration action is allowed by
|
||||||
|
|
|
@ -35,37 +35,38 @@ type TestACLAgent struct {
|
||||||
// Basically it needs a local state for some of the vet* functions, a logger and a delegate.
|
// Basically it needs a local state for some of the vet* functions, a logger and a delegate.
|
||||||
// The key is that we are the delegate so we can control the ResolveToken responses
|
// The key is that we are the delegate so we can control the ResolveToken responses
|
||||||
func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzResolver, resolveIdent identResolver) *TestACLAgent {
|
func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzResolver, resolveIdent identResolver) *TestACLAgent {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
a := &TestACLAgent{resolveAuthzFn: resolveAuthz, resolveIdentFn: resolveIdent}
|
a := &TestACLAgent{resolveAuthzFn: resolveAuthz, resolveIdentFn: resolveIdent}
|
||||||
|
|
||||||
dataDir := testutil.TempDir(t, "acl-agent")
|
dataDir := testutil.TempDir(t, "acl-agent")
|
||||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
|
||||||
Name: name,
|
|
||||||
Level: hclog.Debug,
|
|
||||||
Output: testutil.NewLogBuffer(t),
|
|
||||||
})
|
|
||||||
|
|
||||||
opts := []AgentOption{
|
logBuffer := testutil.NewLogBuffer(t)
|
||||||
WithLogger(logger),
|
loader := func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error) {
|
||||||
WithBuilderOpts(config.BuilderOpts{
|
dataDir := fmt.Sprintf(`data_dir = "%s"`, dataDir)
|
||||||
HCL: []string{
|
opts := config.BuilderOpts{
|
||||||
TestConfigHCL(NodeID()),
|
HCL: []string{TestConfigHCL(NodeID()), hcl, dataDir},
|
||||||
hcl,
|
}
|
||||||
fmt.Sprintf(`data_dir = "%s"`, dataDir),
|
return config.Load(opts, source)
|
||||||
},
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
|
bd, err := NewBaseDeps(loader, logBuffer)
|
||||||
agent, err := New(opts...)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
cfg := agent.GetConfig()
|
|
||||||
|
bd.Logger = hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||||
|
Name: name,
|
||||||
|
Level: hclog.Debug,
|
||||||
|
Output: logBuffer,
|
||||||
|
TimeFormat: "04:05.000",
|
||||||
|
})
|
||||||
|
bd.TelemetrySink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||||
|
|
||||||
|
agent, err := New(bd)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
agent.delegate = a
|
||||||
|
agent.State = local.NewState(LocalConfig(bd.RuntimeConfig), bd.Logger, bd.Tokens)
|
||||||
|
agent.State.TriggerSyncChanges = func() {}
|
||||||
a.Agent = agent
|
a.Agent = agent
|
||||||
|
|
||||||
agent.logger = logger
|
|
||||||
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
|
||||||
|
|
||||||
a.Agent.delegate = a
|
|
||||||
a.Agent.State = local.NewState(LocalConfig(cfg), logger, a.Agent.tokens)
|
|
||||||
a.Agent.State.TriggerSyncChanges = func() {}
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
265
agent/agent.go
265
agent/agent.go
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/hashicorp/go-connlimit"
|
"github.com/hashicorp/go-connlimit"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/mitchellh/cli"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
|
@ -312,85 +311,6 @@ type Agent struct {
|
||||||
enterpriseAgent
|
enterpriseAgent
|
||||||
}
|
}
|
||||||
|
|
||||||
type agentOptions struct {
|
|
||||||
logger hclog.InterceptLogger
|
|
||||||
builderOpts config.BuilderOpts
|
|
||||||
ui cli.Ui
|
|
||||||
config *config.RuntimeConfig
|
|
||||||
overrides []config.Source
|
|
||||||
writers []io.Writer
|
|
||||||
initTelemetry bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type AgentOption func(opt *agentOptions)
|
|
||||||
|
|
||||||
// WithTelemetry is used to control whether the agent will
|
|
||||||
// set up metrics.
|
|
||||||
func WithTelemetry(initTelemetry bool) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.initTelemetry = initTelemetry
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLogger is used to override any automatic logger creation
|
|
||||||
// and provide one already built instead. This is mostly useful
|
|
||||||
// for testing.
|
|
||||||
func WithLogger(logger hclog.InterceptLogger) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.logger = logger
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithBuilderOpts specifies the command line config.BuilderOpts to use that the agent
|
|
||||||
// is being started with
|
|
||||||
func WithBuilderOpts(builderOpts config.BuilderOpts) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.builderOpts = builderOpts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithCLI provides a cli.Ui instance to use when emitting configuration
|
|
||||||
// warnings during the first configuration parsing.
|
|
||||||
func WithCLI(ui cli.Ui) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.ui = ui
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLogWriter will add an additional log output to the logger that gets
|
|
||||||
// configured after configuration parsing
|
|
||||||
func WithLogWriter(writer io.Writer) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.writers = append(opt.writers, writer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithOverrides is used to provide a config source to append to the tail sources
|
|
||||||
// during config building. It is really only useful for testing to tune non-user
|
|
||||||
// configurable tunables to make various tests converge more quickly than they
|
|
||||||
// could otherwise.
|
|
||||||
func WithOverrides(overrides ...config.Source) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.overrides = overrides
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithConfig provides an already parsed configuration to the Agent
|
|
||||||
// Deprecated: Should allow the agent to parse the configuration.
|
|
||||||
func WithConfig(config *config.RuntimeConfig) AgentOption {
|
|
||||||
return func(opt *agentOptions) {
|
|
||||||
opt.config = config
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func flattenAgentOptions(options []AgentOption) agentOptions {
|
|
||||||
var flat agentOptions
|
|
||||||
for _, opt := range options {
|
|
||||||
opt(&flat)
|
|
||||||
}
|
|
||||||
return flat
|
|
||||||
}
|
|
||||||
|
|
||||||
// New process the desired options and creates a new Agent.
|
// New process the desired options and creates a new Agent.
|
||||||
// This process will
|
// This process will
|
||||||
// * parse the config given the config Flags
|
// * parse the config given the config Flags
|
||||||
|
@ -406,10 +326,7 @@ func flattenAgentOptions(options []AgentOption) agentOptions {
|
||||||
// * setup the NodeID if one isn't provided in the configuration
|
// * setup the NodeID if one isn't provided in the configuration
|
||||||
// * create the AutoConfig object for future use in fully
|
// * create the AutoConfig object for future use in fully
|
||||||
// resolving the configuration
|
// resolving the configuration
|
||||||
func New(options ...AgentOption) (*Agent, error) {
|
func New(bd BaseDeps) (*Agent, error) {
|
||||||
flat := flattenAgentOptions(options)
|
|
||||||
|
|
||||||
// Create most of the agent
|
|
||||||
a := Agent{
|
a := Agent{
|
||||||
checkReapAfter: make(map[structs.CheckID]time.Duration),
|
checkReapAfter: make(map[structs.CheckID]time.Duration),
|
||||||
checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor),
|
checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor),
|
||||||
|
@ -425,94 +342,28 @@ func New(options ...AgentOption) (*Agent, error) {
|
||||||
retryJoinCh: make(chan error),
|
retryJoinCh: make(chan error),
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
endpoints: make(map[string]string),
|
endpoints: make(map[string]string),
|
||||||
tokens: new(token.Store),
|
|
||||||
logger: flat.logger,
|
// TODO: store the BaseDeps instead of copying them over to Agent
|
||||||
|
tokens: bd.Tokens,
|
||||||
|
logger: bd.Logger,
|
||||||
|
tlsConfigurator: bd.TLSConfigurator,
|
||||||
|
config: bd.RuntimeConfig,
|
||||||
|
cache: bd.Cache,
|
||||||
|
MemSink: bd.TelemetrySink,
|
||||||
|
connPool: bd.ConnPool,
|
||||||
|
autoConf: bd.AutoConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse the configuration and handle the error/warnings
|
// TODO: set globals somewhere else, not Agent.New
|
||||||
cfg, warnings, err := config.Load(flat.builderOpts, nil, flat.overrides...)
|
grpclog.SetLoggerV2(logging.NewGRPCLogger(bd.RuntimeConfig.LogLevel, bd.Logger))
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for _, w := range warnings {
|
|
||||||
if a.logger != nil {
|
|
||||||
a.logger.Warn(w)
|
|
||||||
} else if flat.ui != nil {
|
|
||||||
flat.ui.Warn(w)
|
|
||||||
} else {
|
|
||||||
fmt.Fprint(os.Stderr, w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the config in the agent, this is just the preliminary configuration as we haven't
|
|
||||||
// loaded any auto-config sources yet.
|
|
||||||
a.config = cfg
|
|
||||||
|
|
||||||
// create the cache using the rate limiting settings from the config. Note that this means
|
|
||||||
// that these limits are not reloadable.
|
|
||||||
a.cache = cache.New(a.config.Cache)
|
|
||||||
|
|
||||||
if flat.logger == nil {
|
|
||||||
logConf := &logging.Config{
|
|
||||||
LogLevel: cfg.LogLevel,
|
|
||||||
LogJSON: cfg.LogJSON,
|
|
||||||
Name: logging.Agent,
|
|
||||||
EnableSyslog: cfg.EnableSyslog,
|
|
||||||
SyslogFacility: cfg.SyslogFacility,
|
|
||||||
LogFilePath: cfg.LogFile,
|
|
||||||
LogRotateDuration: cfg.LogRotateDuration,
|
|
||||||
LogRotateBytes: cfg.LogRotateBytes,
|
|
||||||
LogRotateMaxFiles: cfg.LogRotateMaxFiles,
|
|
||||||
}
|
|
||||||
|
|
||||||
a.logger, err = logging.Setup(logConf, flat.writers)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
grpclog.SetLoggerV2(logging.NewGRPCLogger(logConf, a.logger))
|
|
||||||
}
|
|
||||||
|
|
||||||
if flat.initTelemetry {
|
|
||||||
memSink, err := lib.InitTelemetry(cfg.Telemetry)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to initialize telemetry: %w", err)
|
|
||||||
}
|
|
||||||
a.MemSink = memSink
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO (autoconf) figure out how to let this setting be pushed down via autoconf
|
|
||||||
// right now it gets defaulted if unset so this check actually doesn't do much
|
|
||||||
// for a normal running agent.
|
|
||||||
if a.config.Datacenter == "" {
|
|
||||||
return nil, fmt.Errorf("Must configure a Datacenter")
|
|
||||||
}
|
|
||||||
if a.config.DataDir == "" && !a.config.DevMode {
|
|
||||||
return nil, fmt.Errorf("Must configure a DataDir")
|
|
||||||
}
|
|
||||||
|
|
||||||
tlsConfigurator, err := tlsutil.NewConfigurator(a.config.ToTLSUtilConfig(), a.logger)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
a.tlsConfigurator = tlsConfigurator
|
|
||||||
|
|
||||||
err = a.initializeConnectionPool()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to initialize the connection pool: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
a.serviceManager = NewServiceManager(&a)
|
a.serviceManager = NewServiceManager(&a)
|
||||||
|
|
||||||
if err := a.initializeACLs(); err != nil {
|
// TODO: do this somewhere else, maybe move to newBaseDeps
|
||||||
return nil, err
|
var err error
|
||||||
}
|
a.aclMasterAuthorizer, err = initializeACLs(bd.RuntimeConfig.NodeName)
|
||||||
|
|
||||||
// Retrieve or generate the node ID before setting up the rest of the
|
|
||||||
// agent, which depends on it.
|
|
||||||
cfg.NodeID, err = newNodeIDFromConfig(a.config, a.logger)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to setup node ID: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// We used to do this in the Start method. However it doesn't need to go
|
// We used to do this in the Start method. However it doesn't need to go
|
||||||
|
@ -521,48 +372,9 @@ func New(options ...AgentOption) (*Agent, error) {
|
||||||
// pass the agent itself so its safe to move here.
|
// pass the agent itself so its safe to move here.
|
||||||
a.registerCache()
|
a.registerCache()
|
||||||
|
|
||||||
cmConf := new(certmon.Config).
|
|
||||||
WithCache(a.cache).
|
|
||||||
WithTLSConfigurator(a.tlsConfigurator).
|
|
||||||
WithDNSSANs(a.config.AutoConfig.DNSSANs).
|
|
||||||
WithIPSANs(a.config.AutoConfig.IPSANs).
|
|
||||||
WithDatacenter(a.config.Datacenter).
|
|
||||||
WithNodeName(a.config.NodeName).
|
|
||||||
WithFallback(a.autoConfigFallbackTLS).
|
|
||||||
WithLogger(a.logger.Named(logging.AutoConfig)).
|
|
||||||
WithTokens(a.tokens).
|
|
||||||
WithPersistence(a.autoConfigPersist)
|
|
||||||
acCertMon, err := certmon.New(cmConf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
acConf := autoconf.Config{
|
|
||||||
DirectRPC: a.connPool,
|
|
||||||
Logger: a.logger,
|
|
||||||
CertMonitor: acCertMon,
|
|
||||||
Loader: func(source config.Source) (*config.RuntimeConfig, []string, error) {
|
|
||||||
return config.Load(flat.builderOpts, source, flat.overrides...)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
ac, err := autoconf.New(acConf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
a.autoConf = ac
|
|
||||||
|
|
||||||
return &a, nil
|
return &a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLogger retrieves the agents logger
|
|
||||||
// TODO make export the logger field and get rid of this method
|
|
||||||
// This is here for now to simplify the work I am doing and make
|
|
||||||
// reviewing the final PR easier.
|
|
||||||
func (a *Agent) GetLogger() hclog.InterceptLogger {
|
|
||||||
return a.logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetConfig retrieves the agents config
|
// GetConfig retrieves the agents config
|
||||||
// TODO make export the config field and get rid of this method
|
// TODO make export the config field and get rid of this method
|
||||||
// This is here for now to simplify the work I am doing and make
|
// This is here for now to simplify the work I am doing and make
|
||||||
|
@ -573,31 +385,6 @@ func (a *Agent) GetConfig() *config.RuntimeConfig {
|
||||||
return a.config
|
return a.config
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) initializeConnectionPool() error {
|
|
||||||
var rpcSrcAddr *net.TCPAddr
|
|
||||||
if !ipaddr.IsAny(a.config.RPCBindAddr) {
|
|
||||||
rpcSrcAddr = &net.TCPAddr{IP: a.config.RPCBindAddr.IP}
|
|
||||||
}
|
|
||||||
|
|
||||||
pool := &pool.ConnPool{
|
|
||||||
Server: a.config.ServerMode,
|
|
||||||
SrcAddr: rpcSrcAddr,
|
|
||||||
Logger: a.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
|
||||||
TLSConfigurator: a.tlsConfigurator,
|
|
||||||
Datacenter: a.config.Datacenter,
|
|
||||||
}
|
|
||||||
if a.config.ServerMode {
|
|
||||||
pool.MaxTime = 2 * time.Minute
|
|
||||||
pool.MaxStreams = 64
|
|
||||||
} else {
|
|
||||||
pool.MaxTime = 127 * time.Second
|
|
||||||
pool.MaxStreams = 32
|
|
||||||
}
|
|
||||||
|
|
||||||
a.connPool = pool
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config
|
// LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config
|
||||||
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
||||||
lc := local.Config{
|
lc := local.Config{
|
||||||
|
@ -638,8 +425,8 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err)
|
return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// load the tokens - this requires the logger to be setup
|
// TODO: move to newBaseDeps
|
||||||
// which is why we can't do this in New
|
// TODO: handle error
|
||||||
a.loadTokens(a.config)
|
a.loadTokens(a.config)
|
||||||
a.loadEnterpriseTokens(a.config)
|
a.loadEnterpriseTokens(a.config)
|
||||||
|
|
||||||
|
@ -858,20 +645,6 @@ func (a *Agent) autoEncryptInitialCertificate(ctx context.Context) (*structs.Sig
|
||||||
return client.RequestAutoEncryptCerts(ctx, addrs, a.config.ServerPort, a.tokens.AgentToken(), a.config.AutoEncryptDNSSAN, a.config.AutoEncryptIPSAN)
|
return client.RequestAutoEncryptCerts(ctx, addrs, a.config.ServerPort, a.tokens.AgentToken(), a.config.AutoEncryptDNSSAN, a.config.AutoEncryptIPSAN)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) autoConfigFallbackTLS(ctx context.Context) (*structs.SignedResponse, error) {
|
|
||||||
if a.autoConf == nil {
|
|
||||||
return nil, fmt.Errorf("AutoConfig manager has not been created yet")
|
|
||||||
}
|
|
||||||
return a.autoConf.FallbackTLS(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *Agent) autoConfigPersist(resp *structs.SignedResponse) error {
|
|
||||||
if a.autoConf == nil {
|
|
||||||
return fmt.Errorf("AutoConfig manager has not been created yet")
|
|
||||||
}
|
|
||||||
return a.autoConf.RecordUpdatedCerts(resp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *Agent) listenAndServeGRPC() error {
|
func (a *Agent) listenAndServeGRPC() error {
|
||||||
if len(a.config.GRPCAddrs) < 1 {
|
if len(a.config.GRPCAddrs) < 1 {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -35,11 +35,11 @@ import (
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoadConfig will build the configuration including the extraHead source injected
|
// Load will build the configuration including the extraHead source injected
|
||||||
// after all other defaults but before any user supplied configuration and the overrides
|
// after all other defaults but before any user supplied configuration and the overrides
|
||||||
// source injected as the final source in the configuration parsing chain.
|
// source injected as the final source in the configuration parsing chain.
|
||||||
func Load(builderOpts BuilderOpts, extraHead Source, overrides ...Source) (*RuntimeConfig, []string, error) {
|
func Load(opts BuilderOpts, extraHead Source, overrides ...Source) (*RuntimeConfig, []string, error) {
|
||||||
b, err := NewBuilder(builderOpts)
|
b, err := NewBuilder(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
certmon "github.com/hashicorp/consul/agent/cert-monitor"
|
||||||
|
"github.com/hashicorp/consul/agent/config"
|
||||||
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
"github.com/hashicorp/consul/ipaddr"
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/consul/logging"
|
||||||
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: BaseDeps should be renamed in the future once more of Agent.Start
|
||||||
|
// has been moved out in front of Agent.New, and we can better see the setup
|
||||||
|
// dependencies.
|
||||||
|
type BaseDeps struct {
|
||||||
|
Logger hclog.InterceptLogger
|
||||||
|
TLSConfigurator *tlsutil.Configurator // TODO: use an interface
|
||||||
|
TelemetrySink *metrics.InmemSink // TODO: use an interface
|
||||||
|
RuntimeConfig *config.RuntimeConfig
|
||||||
|
Tokens *token.Store
|
||||||
|
Cache *cache.Cache
|
||||||
|
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||||
|
ConnPool *pool.ConnPool // TODO: use an interface
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConfigLoader func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error)
|
||||||
|
|
||||||
|
func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) {
|
||||||
|
d := BaseDeps{}
|
||||||
|
cfg, warnings, err := configLoader(nil)
|
||||||
|
if err != nil {
|
||||||
|
return d, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: use logging.Config in RuntimeConfig instead of separate fields
|
||||||
|
logConf := &logging.Config{
|
||||||
|
LogLevel: cfg.LogLevel,
|
||||||
|
LogJSON: cfg.LogJSON,
|
||||||
|
Name: logging.Agent,
|
||||||
|
EnableSyslog: cfg.EnableSyslog,
|
||||||
|
SyslogFacility: cfg.SyslogFacility,
|
||||||
|
LogFilePath: cfg.LogFile,
|
||||||
|
LogRotateDuration: cfg.LogRotateDuration,
|
||||||
|
LogRotateBytes: cfg.LogRotateBytes,
|
||||||
|
LogRotateMaxFiles: cfg.LogRotateMaxFiles,
|
||||||
|
}
|
||||||
|
d.Logger, err = logging.Setup(logConf, []io.Writer{logOut})
|
||||||
|
if err != nil {
|
||||||
|
return d, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, w := range warnings {
|
||||||
|
d.Logger.Warn(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.NodeID, err = newNodeIDFromConfig(cfg, d.Logger)
|
||||||
|
if err != nil {
|
||||||
|
return d, fmt.Errorf("failed to setup node ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.TelemetrySink, err = lib.InitTelemetry(cfg.Telemetry)
|
||||||
|
if err != nil {
|
||||||
|
return d, fmt.Errorf("failed to initialize telemetry: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.TLSConfigurator, err = tlsutil.NewConfigurator(cfg.ToTLSUtilConfig(), d.Logger)
|
||||||
|
if err != nil {
|
||||||
|
return d, err
|
||||||
|
}
|
||||||
|
|
||||||
|
d.RuntimeConfig = cfg
|
||||||
|
d.Tokens = new(token.Store)
|
||||||
|
// cache-types are not registered yet, but they won't be used until the components are started.
|
||||||
|
d.Cache = cache.New(cfg.Cache)
|
||||||
|
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
|
||||||
|
|
||||||
|
deferredAC := &deferredAutoConfig{}
|
||||||
|
|
||||||
|
cmConf := new(certmon.Config).
|
||||||
|
WithCache(d.Cache).
|
||||||
|
WithTLSConfigurator(d.TLSConfigurator).
|
||||||
|
WithDNSSANs(cfg.AutoConfig.DNSSANs).
|
||||||
|
WithIPSANs(cfg.AutoConfig.IPSANs).
|
||||||
|
WithDatacenter(cfg.Datacenter).
|
||||||
|
WithNodeName(cfg.NodeName).
|
||||||
|
WithFallback(deferredAC.autoConfigFallbackTLS).
|
||||||
|
WithLogger(d.Logger.Named(logging.AutoConfig)).
|
||||||
|
WithTokens(d.Tokens).
|
||||||
|
WithPersistence(deferredAC.autoConfigPersist)
|
||||||
|
acCertMon, err := certmon.New(cmConf)
|
||||||
|
if err != nil {
|
||||||
|
return d, err
|
||||||
|
}
|
||||||
|
|
||||||
|
acConf := autoconf.Config{
|
||||||
|
DirectRPC: d.ConnPool,
|
||||||
|
Logger: d.Logger,
|
||||||
|
CertMonitor: acCertMon,
|
||||||
|
Loader: configLoader,
|
||||||
|
}
|
||||||
|
d.AutoConfig, err = autoconf.New(acConf)
|
||||||
|
if err != nil {
|
||||||
|
return d, err
|
||||||
|
}
|
||||||
|
// TODO: can this cyclic dependency be un-cycled?
|
||||||
|
deferredAC.autoConf = d.AutoConfig
|
||||||
|
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
|
||||||
|
var rpcSrcAddr *net.TCPAddr
|
||||||
|
if !ipaddr.IsAny(config.RPCBindAddr) {
|
||||||
|
rpcSrcAddr = &net.TCPAddr{IP: config.RPCBindAddr.IP}
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := &pool.ConnPool{
|
||||||
|
Server: config.ServerMode,
|
||||||
|
SrcAddr: rpcSrcAddr,
|
||||||
|
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||||
|
TLSConfigurator: tls,
|
||||||
|
Datacenter: config.Datacenter,
|
||||||
|
}
|
||||||
|
if config.ServerMode {
|
||||||
|
pool.MaxTime = 2 * time.Minute
|
||||||
|
pool.MaxStreams = 64
|
||||||
|
} else {
|
||||||
|
pool.MaxTime = 127 * time.Second
|
||||||
|
pool.MaxStreams = 32
|
||||||
|
}
|
||||||
|
return pool
|
||||||
|
}
|
||||||
|
|
||||||
|
type deferredAutoConfig struct {
|
||||||
|
autoConf *autoconf.AutoConfig // TODO: use an interface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *deferredAutoConfig) autoConfigFallbackTLS(ctx context.Context) (*structs.SignedResponse, error) {
|
||||||
|
if a.autoConf == nil {
|
||||||
|
return nil, fmt.Errorf("AutoConfig manager has not been created yet")
|
||||||
|
}
|
||||||
|
return a.autoConf.FallbackTLS(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *deferredAutoConfig) autoConfigPersist(resp *structs.SignedResponse) error {
|
||||||
|
if a.autoConf == nil {
|
||||||
|
return fmt.Errorf("AutoConfig manager has not been created yet")
|
||||||
|
}
|
||||||
|
return a.autoConf.RecordUpdatedCerts(resp)
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/hashicorp/errwrap"
|
"github.com/hashicorp/errwrap"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
uuid "github.com/hashicorp/go-uuid"
|
uuid "github.com/hashicorp/go-uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
|
@ -167,38 +168,35 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
||||||
portsConfig, returnPortsFn := randomPortsSource(a.UseTLS)
|
portsConfig, returnPortsFn := randomPortsSource(a.UseTLS)
|
||||||
t.Cleanup(returnPortsFn)
|
t.Cleanup(returnPortsFn)
|
||||||
|
|
||||||
nodeID := NodeID()
|
// Create NodeID outside the closure, so that it does not change
|
||||||
|
testHCLConfig := TestConfigHCL(NodeID())
|
||||||
opts := []AgentOption{
|
loader := func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error) {
|
||||||
WithLogger(logger),
|
opts := config.BuilderOpts{
|
||||||
WithBuilderOpts(config.BuilderOpts{
|
HCL: []string{testHCLConfig, portsConfig, a.HCL, hclDataDir},
|
||||||
HCL: []string{
|
}
|
||||||
TestConfigHCL(nodeID),
|
overrides := []config.Source{
|
||||||
portsConfig,
|
config.FileSource{
|
||||||
a.HCL,
|
Name: "test-overrides",
|
||||||
hclDataDir,
|
Format: "hcl",
|
||||||
},
|
Data: a.Overrides},
|
||||||
}),
|
|
||||||
WithOverrides(config.FileSource{
|
|
||||||
Name: "test-overrides",
|
|
||||||
Format: "hcl",
|
|
||||||
Data: a.Overrides},
|
|
||||||
config.DefaultConsulSource(),
|
config.DefaultConsulSource(),
|
||||||
config.DevConsulSource(),
|
config.DevConsulSource(),
|
||||||
),
|
}
|
||||||
|
return config.Load(opts, source, overrides...)
|
||||||
}
|
}
|
||||||
|
bd, err := NewBaseDeps(loader, logOutput)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
agent, err := New(opts...)
|
bd.Logger = logger
|
||||||
|
bd.TelemetrySink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||||
|
a.Config = bd.RuntimeConfig
|
||||||
|
|
||||||
|
agent, err := New(bd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error creating agent: %s", err)
|
return fmt.Errorf("Error creating agent: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
a.Config = agent.GetConfig()
|
|
||||||
|
|
||||||
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
|
||||||
|
|
||||||
id := string(a.Config.NodeID)
|
id := string(a.Config.NodeID)
|
||||||
|
|
||||||
if err := agent.Start(context.Background()); err != nil {
|
if err := agent.Start(context.Background()); err != nil {
|
||||||
agent.ShutdownAgent()
|
agent.ShutdownAgent()
|
||||||
agent.ShutdownEndpoints()
|
agent.ShutdownEndpoints()
|
||||||
|
|
|
@ -60,11 +60,6 @@ type cmd struct {
|
||||||
logger hclog.InterceptLogger
|
logger hclog.InterceptLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
type GatedUi struct {
|
|
||||||
JSONoutput bool
|
|
||||||
ui cli.Ui
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cmd) init() {
|
func (c *cmd) init() {
|
||||||
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
|
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
|
||||||
config.AddFlags(c.flags, &c.flagArgs)
|
config.AddFlags(c.flags, &c.flagArgs)
|
||||||
|
@ -165,25 +160,26 @@ func (c *cmd) run(args []string) int {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
logGate := logging.GatedWriter{Writer: &cli.UiWriter{Ui: c.UI}}
|
logGate := &logging.GatedWriter{Writer: &cli.UiWriter{Ui: c.UI}}
|
||||||
|
loader := func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error) {
|
||||||
agentOptions := []agent.AgentOption{
|
return config.Load(c.flagArgs, source)
|
||||||
agent.WithBuilderOpts(c.flagArgs),
|
|
||||||
agent.WithCLI(c.UI),
|
|
||||||
agent.WithLogWriter(&logGate),
|
|
||||||
agent.WithTelemetry(true),
|
|
||||||
}
|
}
|
||||||
|
bd, err := agent.NewBaseDeps(loader, logGate)
|
||||||
agent, err := agent.New(agentOptions...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.UI.Error(err.Error())
|
c.UI.Error(err.Error())
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
config := agent.GetConfig()
|
c.logger = bd.Logger
|
||||||
c.logger = agent.GetLogger()
|
agent, err := agent.New(bd)
|
||||||
|
if err != nil {
|
||||||
|
c.UI.Error(err.Error())
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
//Setup gate to check if we should output CLI information
|
config := bd.RuntimeConfig
|
||||||
|
|
||||||
|
// Setup gate to check if we should output CLI information
|
||||||
cli := GatedUi{
|
cli := GatedUi{
|
||||||
JSONoutput: config.LogJSON,
|
JSONoutput: config.LogJSON,
|
||||||
ui: c.UI,
|
ui: c.UI,
|
||||||
|
@ -344,6 +340,11 @@ func (c *cmd) run(args []string) int {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type GatedUi struct {
|
||||||
|
JSONoutput bool
|
||||||
|
ui cli.Ui
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GatedUi) output(s string) {
|
func (g *GatedUi) output(s string) {
|
||||||
if !g.JSONoutput {
|
if !g.JSONoutput {
|
||||||
g.ui.Output(s)
|
g.ui.Output(s)
|
||||||
|
|
|
@ -19,9 +19,9 @@ type GRPCLogger struct {
|
||||||
// Note that grpclog has Info, Warning, Error, Fatal severity levels AND integer
|
// Note that grpclog has Info, Warning, Error, Fatal severity levels AND integer
|
||||||
// verbosity levels for additional info. Verbose logs in glog are always INFO
|
// verbosity levels for additional info. Verbose logs in glog are always INFO
|
||||||
// severity so we map Info,V0 to INFO, Info,V1 to DEBUG, and Info,V>1 to TRACE.
|
// severity so we map Info,V0 to INFO, Info,V1 to DEBUG, and Info,V>1 to TRACE.
|
||||||
func NewGRPCLogger(config *Config, logger hclog.Logger) *GRPCLogger {
|
func NewGRPCLogger(logLevel string, logger hclog.Logger) *GRPCLogger {
|
||||||
return &GRPCLogger{
|
return &GRPCLogger{
|
||||||
level: config.LogLevel,
|
level: logLevel,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestGRPCLogger(t *testing.T) {
|
||||||
Output: &out,
|
Output: &out,
|
||||||
TimeFormat: "timeformat",
|
TimeFormat: "timeformat",
|
||||||
})
|
})
|
||||||
grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: "TRACE"}, logger))
|
grpclog.SetLoggerV2(NewGRPCLogger("TRACE", logger))
|
||||||
|
|
||||||
// All of these should output something
|
// All of these should output something
|
||||||
grpclog.Info("Info,")
|
grpclog.Info("Info,")
|
||||||
|
@ -92,7 +92,7 @@ func TestGRPCLogger_V(t *testing.T) {
|
||||||
Level: hclog.Trace,
|
Level: hclog.Trace,
|
||||||
Output: &out,
|
Output: &out,
|
||||||
})
|
})
|
||||||
grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: tt.level}, logger))
|
grpclog.SetLoggerV2(NewGRPCLogger(tt.level, logger))
|
||||||
|
|
||||||
assert.Equal(t, tt.want, grpclog.V(tt.v))
|
assert.Equal(t, tt.want, grpclog.V(tt.v))
|
||||||
})
|
})
|
||||||
|
|
|
@ -63,6 +63,7 @@ type LogSetupErrorFn func(string)
|
||||||
// The provided ui object will get any log messages related to setting up
|
// The provided ui object will get any log messages related to setting up
|
||||||
// logging itself, and will also be hooked up to the gated logger. The final bool
|
// logging itself, and will also be hooked up to the gated logger. The final bool
|
||||||
// parameter indicates if logging was set up successfully.
|
// parameter indicates if logging was set up successfully.
|
||||||
|
// TODO: accept a single io.Writer
|
||||||
func Setup(config *Config, writers []io.Writer) (hclog.InterceptLogger, error) {
|
func Setup(config *Config, writers []io.Writer) (hclog.InterceptLogger, error) {
|
||||||
if !ValidateLogLevel(config.LogLevel) {
|
if !ValidateLogLevel(config.LogLevel) {
|
||||||
return nil, fmt.Errorf("Invalid log level: %s. Valid log levels are: %v",
|
return nil, fmt.Errorf("Invalid log level: %s. Valid log levels are: %v",
|
||||||
|
|
Loading…
Reference in New Issue