mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
Merge pull request #8511 from hashicorp/dnephin/agent-setup
agent: extract dependency creation from New
This commit is contained in:
commit
2128f40fc2
11
agent/acl.go
11
agent/acl.go
@ -59,7 +59,7 @@ func (a *Agent) aclAccessorID(secretID string) string {
|
||||
return ident.ID()
|
||||
}
|
||||
|
||||
func (a *Agent) initializeACLs() error {
|
||||
func initializeACLs(nodeName string) (acl.Authorizer, error) {
|
||||
// Build a policy for the agent master token.
|
||||
// The builtin agent master policy allows reading any node information
|
||||
// and allows writes to the agent with the node name of the running agent
|
||||
@ -69,7 +69,7 @@ func (a *Agent) initializeACLs() error {
|
||||
PolicyRules: acl.PolicyRules{
|
||||
Agents: []*acl.AgentRule{
|
||||
{
|
||||
Node: a.config.NodeName,
|
||||
Node: nodeName,
|
||||
Policy: acl.PolicyWrite,
|
||||
},
|
||||
},
|
||||
@ -81,12 +81,7 @@ func (a *Agent) initializeACLs() error {
|
||||
},
|
||||
},
|
||||
}
|
||||
master, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.aclMasterAuthorizer = master
|
||||
return nil
|
||||
return acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
|
||||
}
|
||||
|
||||
// vetServiceRegister makes sure the service registration action is allowed by
|
||||
|
@ -35,37 +35,42 @@ type TestACLAgent struct {
|
||||
// Basically it needs a local state for some of the vet* functions, a logger and a delegate.
|
||||
// The key is that we are the delegate so we can control the ResolveToken responses
|
||||
func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzResolver, resolveIdent identResolver) *TestACLAgent {
|
||||
t.Helper()
|
||||
|
||||
a := &TestACLAgent{resolveAuthzFn: resolveAuthz, resolveIdentFn: resolveIdent}
|
||||
|
||||
dataDir := testutil.TempDir(t, "acl-agent")
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
|
||||
logBuffer := testutil.NewLogBuffer(t)
|
||||
loader := func(source config.Source) (*config.RuntimeConfig, []string, error) {
|
||||
dataDir := fmt.Sprintf(`data_dir = "%s"`, dataDir)
|
||||
opts := config.BuilderOpts{
|
||||
HCL: []string{TestConfigHCL(NodeID()), hcl, dataDir},
|
||||
}
|
||||
cfg, warnings, err := config.Load(opts, source)
|
||||
if cfg != nil {
|
||||
cfg.Telemetry.Disable = true
|
||||
}
|
||||
return cfg, warnings, err
|
||||
}
|
||||
bd, err := NewBaseDeps(loader, logBuffer)
|
||||
require.NoError(t, err)
|
||||
|
||||
bd.Logger = hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Name: name,
|
||||
Level: hclog.Debug,
|
||||
Output: testutil.NewLogBuffer(t),
|
||||
Output: logBuffer,
|
||||
TimeFormat: "04:05.000",
|
||||
})
|
||||
bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
|
||||
opts := []AgentOption{
|
||||
WithLogger(logger),
|
||||
WithBuilderOpts(config.BuilderOpts{
|
||||
HCL: []string{
|
||||
TestConfigHCL(NodeID()),
|
||||
hcl,
|
||||
fmt.Sprintf(`data_dir = "%s"`, dataDir),
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
agent, err := New(opts...)
|
||||
agent, err := New(bd)
|
||||
require.NoError(t, err)
|
||||
cfg := agent.GetConfig()
|
||||
|
||||
agent.delegate = a
|
||||
agent.State = local.NewState(LocalConfig(bd.RuntimeConfig), bd.Logger, bd.Tokens)
|
||||
agent.State.TriggerSyncChanges = func() {}
|
||||
a.Agent = agent
|
||||
|
||||
agent.logger = logger
|
||||
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
|
||||
a.Agent.delegate = a
|
||||
a.Agent.State = local.NewState(LocalConfig(cfg), logger, a.Agent.tokens)
|
||||
a.Agent.State.TriggerSyncChanges = func() {}
|
||||
return a
|
||||
}
|
||||
|
||||
|
267
agent/agent.go
267
agent/agent.go
@ -21,7 +21,6 @@ import (
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/mitchellh/cli"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -172,7 +171,7 @@ type Agent struct {
|
||||
logger hclog.InterceptLogger
|
||||
|
||||
// In-memory sink used for collecting metrics
|
||||
MemSink *metrics.InmemSink
|
||||
MemSink MetricsHandler
|
||||
|
||||
// delegate is either a *consul.Server or *consul.Client
|
||||
// depending on the configuration
|
||||
@ -312,85 +311,6 @@ type Agent struct {
|
||||
enterpriseAgent
|
||||
}
|
||||
|
||||
type agentOptions struct {
|
||||
logger hclog.InterceptLogger
|
||||
builderOpts config.BuilderOpts
|
||||
ui cli.Ui
|
||||
config *config.RuntimeConfig
|
||||
overrides []config.Source
|
||||
writers []io.Writer
|
||||
initTelemetry bool
|
||||
}
|
||||
|
||||
type AgentOption func(opt *agentOptions)
|
||||
|
||||
// WithTelemetry is used to control whether the agent will
|
||||
// set up metrics.
|
||||
func WithTelemetry(initTelemetry bool) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.initTelemetry = initTelemetry
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger is used to override any automatic logger creation
|
||||
// and provide one already built instead. This is mostly useful
|
||||
// for testing.
|
||||
func WithLogger(logger hclog.InterceptLogger) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
// WithBuilderOpts specifies the command line config.BuilderOpts to use that the agent
|
||||
// is being started with
|
||||
func WithBuilderOpts(builderOpts config.BuilderOpts) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.builderOpts = builderOpts
|
||||
}
|
||||
}
|
||||
|
||||
// WithCLI provides a cli.Ui instance to use when emitting configuration
|
||||
// warnings during the first configuration parsing.
|
||||
func WithCLI(ui cli.Ui) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.ui = ui
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogWriter will add an additional log output to the logger that gets
|
||||
// configured after configuration parsing
|
||||
func WithLogWriter(writer io.Writer) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.writers = append(opt.writers, writer)
|
||||
}
|
||||
}
|
||||
|
||||
// WithOverrides is used to provide a config source to append to the tail sources
|
||||
// during config building. It is really only useful for testing to tune non-user
|
||||
// configurable tunables to make various tests converge more quickly than they
|
||||
// could otherwise.
|
||||
func WithOverrides(overrides ...config.Source) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.overrides = overrides
|
||||
}
|
||||
}
|
||||
|
||||
// WithConfig provides an already parsed configuration to the Agent
|
||||
// Deprecated: Should allow the agent to parse the configuration.
|
||||
func WithConfig(config *config.RuntimeConfig) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.config = config
|
||||
}
|
||||
}
|
||||
|
||||
func flattenAgentOptions(options []AgentOption) agentOptions {
|
||||
var flat agentOptions
|
||||
for _, opt := range options {
|
||||
opt(&flat)
|
||||
}
|
||||
return flat
|
||||
}
|
||||
|
||||
// New process the desired options and creates a new Agent.
|
||||
// This process will
|
||||
// * parse the config given the config Flags
|
||||
@ -406,10 +326,7 @@ func flattenAgentOptions(options []AgentOption) agentOptions {
|
||||
// * setup the NodeID if one isn't provided in the configuration
|
||||
// * create the AutoConfig object for future use in fully
|
||||
// resolving the configuration
|
||||
func New(options ...AgentOption) (*Agent, error) {
|
||||
flat := flattenAgentOptions(options)
|
||||
|
||||
// Create most of the agent
|
||||
func New(bd BaseDeps) (*Agent, error) {
|
||||
a := Agent{
|
||||
checkReapAfter: make(map[structs.CheckID]time.Duration),
|
||||
checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor),
|
||||
@ -425,94 +342,28 @@ func New(options ...AgentOption) (*Agent, error) {
|
||||
retryJoinCh: make(chan error),
|
||||
shutdownCh: make(chan struct{}),
|
||||
endpoints: make(map[string]string),
|
||||
tokens: new(token.Store),
|
||||
logger: flat.logger,
|
||||
|
||||
// TODO: store the BaseDeps instead of copying them over to Agent
|
||||
tokens: bd.Tokens,
|
||||
logger: bd.Logger,
|
||||
tlsConfigurator: bd.TLSConfigurator,
|
||||
config: bd.RuntimeConfig,
|
||||
cache: bd.Cache,
|
||||
MemSink: bd.MetricsHandler,
|
||||
connPool: bd.ConnPool,
|
||||
autoConf: bd.AutoConfig,
|
||||
}
|
||||
|
||||
// parse the configuration and handle the error/warnings
|
||||
cfg, warnings, err := config.Load(flat.builderOpts, nil, flat.overrides...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, w := range warnings {
|
||||
if a.logger != nil {
|
||||
a.logger.Warn(w)
|
||||
} else if flat.ui != nil {
|
||||
flat.ui.Warn(w)
|
||||
} else {
|
||||
fmt.Fprint(os.Stderr, w)
|
||||
}
|
||||
}
|
||||
|
||||
// set the config in the agent, this is just the preliminary configuration as we haven't
|
||||
// loaded any auto-config sources yet.
|
||||
a.config = cfg
|
||||
|
||||
// create the cache using the rate limiting settings from the config. Note that this means
|
||||
// that these limits are not reloadable.
|
||||
a.cache = cache.New(a.config.Cache)
|
||||
|
||||
if flat.logger == nil {
|
||||
logConf := &logging.Config{
|
||||
LogLevel: cfg.LogLevel,
|
||||
LogJSON: cfg.LogJSON,
|
||||
Name: logging.Agent,
|
||||
EnableSyslog: cfg.EnableSyslog,
|
||||
SyslogFacility: cfg.SyslogFacility,
|
||||
LogFilePath: cfg.LogFile,
|
||||
LogRotateDuration: cfg.LogRotateDuration,
|
||||
LogRotateBytes: cfg.LogRotateBytes,
|
||||
LogRotateMaxFiles: cfg.LogRotateMaxFiles,
|
||||
}
|
||||
|
||||
a.logger, err = logging.Setup(logConf, flat.writers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
grpclog.SetLoggerV2(logging.NewGRPCLogger(logConf, a.logger))
|
||||
}
|
||||
|
||||
if flat.initTelemetry {
|
||||
memSink, err := lib.InitTelemetry(cfg.Telemetry)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to initialize telemetry: %w", err)
|
||||
}
|
||||
a.MemSink = memSink
|
||||
}
|
||||
|
||||
// TODO (autoconf) figure out how to let this setting be pushed down via autoconf
|
||||
// right now it gets defaulted if unset so this check actually doesn't do much
|
||||
// for a normal running agent.
|
||||
if a.config.Datacenter == "" {
|
||||
return nil, fmt.Errorf("Must configure a Datacenter")
|
||||
}
|
||||
if a.config.DataDir == "" && !a.config.DevMode {
|
||||
return nil, fmt.Errorf("Must configure a DataDir")
|
||||
}
|
||||
|
||||
tlsConfigurator, err := tlsutil.NewConfigurator(a.config.ToTLSUtilConfig(), a.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
a.tlsConfigurator = tlsConfigurator
|
||||
|
||||
err = a.initializeConnectionPool()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to initialize the connection pool: %w", err)
|
||||
}
|
||||
// TODO: set globals somewhere else, not Agent.New
|
||||
grpclog.SetLoggerV2(logging.NewGRPCLogger(bd.RuntimeConfig.LogLevel, bd.Logger))
|
||||
|
||||
a.serviceManager = NewServiceManager(&a)
|
||||
|
||||
if err := a.initializeACLs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Retrieve or generate the node ID before setting up the rest of the
|
||||
// agent, which depends on it.
|
||||
cfg.NodeID, err = newNodeIDFromConfig(a.config, a.logger)
|
||||
// TODO: do this somewhere else, maybe move to newBaseDeps
|
||||
var err error
|
||||
a.aclMasterAuthorizer, err = initializeACLs(bd.RuntimeConfig.NodeName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to setup node ID: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We used to do this in the Start method. However it doesn't need to go
|
||||
@ -521,48 +372,9 @@ func New(options ...AgentOption) (*Agent, error) {
|
||||
// pass the agent itself so its safe to move here.
|
||||
a.registerCache()
|
||||
|
||||
cmConf := new(certmon.Config).
|
||||
WithCache(a.cache).
|
||||
WithTLSConfigurator(a.tlsConfigurator).
|
||||
WithDNSSANs(a.config.AutoConfig.DNSSANs).
|
||||
WithIPSANs(a.config.AutoConfig.IPSANs).
|
||||
WithDatacenter(a.config.Datacenter).
|
||||
WithNodeName(a.config.NodeName).
|
||||
WithFallback(a.autoConfigFallbackTLS).
|
||||
WithLogger(a.logger.Named(logging.AutoConfig)).
|
||||
WithTokens(a.tokens).
|
||||
WithPersistence(a.autoConfigPersist)
|
||||
acCertMon, err := certmon.New(cmConf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
acConf := autoconf.Config{
|
||||
DirectRPC: a.connPool,
|
||||
Logger: a.logger,
|
||||
CertMonitor: acCertMon,
|
||||
Loader: func(source config.Source) (*config.RuntimeConfig, []string, error) {
|
||||
return config.Load(flat.builderOpts, source, flat.overrides...)
|
||||
},
|
||||
}
|
||||
ac, err := autoconf.New(acConf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a.autoConf = ac
|
||||
|
||||
return &a, nil
|
||||
}
|
||||
|
||||
// GetLogger retrieves the agents logger
|
||||
// TODO make export the logger field and get rid of this method
|
||||
// This is here for now to simplify the work I am doing and make
|
||||
// reviewing the final PR easier.
|
||||
func (a *Agent) GetLogger() hclog.InterceptLogger {
|
||||
return a.logger
|
||||
}
|
||||
|
||||
// GetConfig retrieves the agents config
|
||||
// TODO make export the config field and get rid of this method
|
||||
// This is here for now to simplify the work I am doing and make
|
||||
@ -573,31 +385,6 @@ func (a *Agent) GetConfig() *config.RuntimeConfig {
|
||||
return a.config
|
||||
}
|
||||
|
||||
func (a *Agent) initializeConnectionPool() error {
|
||||
var rpcSrcAddr *net.TCPAddr
|
||||
if !ipaddr.IsAny(a.config.RPCBindAddr) {
|
||||
rpcSrcAddr = &net.TCPAddr{IP: a.config.RPCBindAddr.IP}
|
||||
}
|
||||
|
||||
pool := &pool.ConnPool{
|
||||
Server: a.config.ServerMode,
|
||||
SrcAddr: rpcSrcAddr,
|
||||
Logger: a.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
TLSConfigurator: a.tlsConfigurator,
|
||||
Datacenter: a.config.Datacenter,
|
||||
}
|
||||
if a.config.ServerMode {
|
||||
pool.MaxTime = 2 * time.Minute
|
||||
pool.MaxStreams = 64
|
||||
} else {
|
||||
pool.MaxTime = 127 * time.Second
|
||||
pool.MaxStreams = 32
|
||||
}
|
||||
|
||||
a.connPool = pool
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config
|
||||
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
||||
lc := local.Config{
|
||||
@ -638,8 +425,8 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err)
|
||||
}
|
||||
|
||||
// load the tokens - this requires the logger to be setup
|
||||
// which is why we can't do this in New
|
||||
// TODO: move to newBaseDeps
|
||||
// TODO: handle error
|
||||
a.loadTokens(a.config)
|
||||
a.loadEnterpriseTokens(a.config)
|
||||
|
||||
@ -858,20 +645,6 @@ func (a *Agent) autoEncryptInitialCertificate(ctx context.Context) (*structs.Sig
|
||||
return client.RequestAutoEncryptCerts(ctx, addrs, a.config.ServerPort, a.tokens.AgentToken(), a.config.AutoEncryptDNSSAN, a.config.AutoEncryptIPSAN)
|
||||
}
|
||||
|
||||
func (a *Agent) autoConfigFallbackTLS(ctx context.Context) (*structs.SignedResponse, error) {
|
||||
if a.autoConf == nil {
|
||||
return nil, fmt.Errorf("AutoConfig manager has not been created yet")
|
||||
}
|
||||
return a.autoConf.FallbackTLS(ctx)
|
||||
}
|
||||
|
||||
func (a *Agent) autoConfigPersist(resp *structs.SignedResponse) error {
|
||||
if a.autoConf == nil {
|
||||
return fmt.Errorf("AutoConfig manager has not been created yet")
|
||||
}
|
||||
return a.autoConf.RecordUpdatedCerts(resp)
|
||||
}
|
||||
|
||||
func (a *Agent) listenAndServeGRPC() error {
|
||||
if len(a.config.GRPCAddrs) < 1 {
|
||||
return nil
|
||||
|
@ -35,11 +35,11 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// LoadConfig will build the configuration including the extraHead source injected
|
||||
// Load will build the configuration including the extraHead source injected
|
||||
// after all other defaults but before any user supplied configuration and the overrides
|
||||
// source injected as the final source in the configuration parsing chain.
|
||||
func Load(builderOpts BuilderOpts, extraHead Source, overrides ...Source) (*RuntimeConfig, []string, error) {
|
||||
b, err := NewBuilder(builderOpts)
|
||||
func Load(opts BuilderOpts, extraHead Source, overrides ...Source) (*RuntimeConfig, []string, error) {
|
||||
b, err := NewBuilder(opts)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -7100,6 +7100,7 @@ func TestSanitize(t *testing.T) {
|
||||
"CirconusCheckTags": "",
|
||||
"CirconusSubmissionInterval": "",
|
||||
"CirconusSubmissionURL": "",
|
||||
"Disable": false,
|
||||
"DisableHostname": false,
|
||||
"DogstatsdAddr": "",
|
||||
"DogstatsdTags": [],
|
||||
|
168
agent/setup.go
Normal file
168
agent/setup.go
Normal file
@ -0,0 +1,168 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
certmon "github.com/hashicorp/consul/agent/cert-monitor"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// TODO: BaseDeps should be renamed in the future once more of Agent.Start
|
||||
// has been moved out in front of Agent.New, and we can better see the setup
|
||||
// dependencies.
|
||||
type BaseDeps struct {
|
||||
Logger hclog.InterceptLogger
|
||||
TLSConfigurator *tlsutil.Configurator // TODO: use an interface
|
||||
MetricsHandler MetricsHandler
|
||||
RuntimeConfig *config.RuntimeConfig
|
||||
Tokens *token.Store
|
||||
Cache *cache.Cache
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
ConnPool *pool.ConnPool // TODO: use an interface
|
||||
}
|
||||
|
||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||
type MetricsHandler interface {
|
||||
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error)
|
||||
}
|
||||
|
||||
type ConfigLoader func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error)
|
||||
|
||||
func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) {
|
||||
d := BaseDeps{}
|
||||
cfg, warnings, err := configLoader(nil)
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
||||
// TODO: use logging.Config in RuntimeConfig instead of separate fields
|
||||
logConf := &logging.Config{
|
||||
LogLevel: cfg.LogLevel,
|
||||
LogJSON: cfg.LogJSON,
|
||||
Name: logging.Agent,
|
||||
EnableSyslog: cfg.EnableSyslog,
|
||||
SyslogFacility: cfg.SyslogFacility,
|
||||
LogFilePath: cfg.LogFile,
|
||||
LogRotateDuration: cfg.LogRotateDuration,
|
||||
LogRotateBytes: cfg.LogRotateBytes,
|
||||
LogRotateMaxFiles: cfg.LogRotateMaxFiles,
|
||||
}
|
||||
d.Logger, err = logging.Setup(logConf, []io.Writer{logOut})
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
||||
for _, w := range warnings {
|
||||
d.Logger.Warn(w)
|
||||
}
|
||||
|
||||
cfg.NodeID, err = newNodeIDFromConfig(cfg, d.Logger)
|
||||
if err != nil {
|
||||
return d, fmt.Errorf("failed to setup node ID: %w", err)
|
||||
}
|
||||
|
||||
d.MetricsHandler, err = lib.InitTelemetry(cfg.Telemetry)
|
||||
if err != nil {
|
||||
return d, fmt.Errorf("failed to initialize telemetry: %w", err)
|
||||
}
|
||||
|
||||
d.TLSConfigurator, err = tlsutil.NewConfigurator(cfg.ToTLSUtilConfig(), d.Logger)
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
||||
d.RuntimeConfig = cfg
|
||||
d.Tokens = new(token.Store)
|
||||
// cache-types are not registered yet, but they won't be used until the components are started.
|
||||
d.Cache = cache.New(cfg.Cache)
|
||||
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
|
||||
|
||||
deferredAC := &deferredAutoConfig{}
|
||||
|
||||
cmConf := new(certmon.Config).
|
||||
WithCache(d.Cache).
|
||||
WithTLSConfigurator(d.TLSConfigurator).
|
||||
WithDNSSANs(cfg.AutoConfig.DNSSANs).
|
||||
WithIPSANs(cfg.AutoConfig.IPSANs).
|
||||
WithDatacenter(cfg.Datacenter).
|
||||
WithNodeName(cfg.NodeName).
|
||||
WithFallback(deferredAC.autoConfigFallbackTLS).
|
||||
WithLogger(d.Logger.Named(logging.AutoConfig)).
|
||||
WithTokens(d.Tokens).
|
||||
WithPersistence(deferredAC.autoConfigPersist)
|
||||
acCertMon, err := certmon.New(cmConf)
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
||||
acConf := autoconf.Config{
|
||||
DirectRPC: d.ConnPool,
|
||||
Logger: d.Logger,
|
||||
CertMonitor: acCertMon,
|
||||
Loader: configLoader,
|
||||
}
|
||||
d.AutoConfig, err = autoconf.New(acConf)
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
// TODO: can this cyclic dependency be un-cycled?
|
||||
deferredAC.autoConf = d.AutoConfig
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
|
||||
var rpcSrcAddr *net.TCPAddr
|
||||
if !ipaddr.IsAny(config.RPCBindAddr) {
|
||||
rpcSrcAddr = &net.TCPAddr{IP: config.RPCBindAddr.IP}
|
||||
}
|
||||
|
||||
pool := &pool.ConnPool{
|
||||
Server: config.ServerMode,
|
||||
SrcAddr: rpcSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: config.Datacenter,
|
||||
}
|
||||
if config.ServerMode {
|
||||
pool.MaxTime = 2 * time.Minute
|
||||
pool.MaxStreams = 64
|
||||
} else {
|
||||
pool.MaxTime = 127 * time.Second
|
||||
pool.MaxStreams = 32
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
type deferredAutoConfig struct {
|
||||
autoConf *autoconf.AutoConfig // TODO: use an interface
|
||||
}
|
||||
|
||||
func (a *deferredAutoConfig) autoConfigFallbackTLS(ctx context.Context) (*structs.SignedResponse, error) {
|
||||
if a.autoConf == nil {
|
||||
return nil, fmt.Errorf("AutoConfig manager has not been created yet")
|
||||
}
|
||||
return a.autoConf.FallbackTLS(ctx)
|
||||
}
|
||||
|
||||
func (a *deferredAutoConfig) autoConfigPersist(resp *structs.SignedResponse) error {
|
||||
if a.autoConf == nil {
|
||||
return fmt.Errorf("AutoConfig manager has not been created yet")
|
||||
}
|
||||
return a.autoConf.RecordUpdatedCerts(resp)
|
||||
}
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/hashicorp/errwrap"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
@ -167,38 +168,39 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
||||
portsConfig, returnPortsFn := randomPortsSource(a.UseTLS)
|
||||
t.Cleanup(returnPortsFn)
|
||||
|
||||
nodeID := NodeID()
|
||||
|
||||
opts := []AgentOption{
|
||||
WithLogger(logger),
|
||||
WithBuilderOpts(config.BuilderOpts{
|
||||
HCL: []string{
|
||||
TestConfigHCL(nodeID),
|
||||
portsConfig,
|
||||
a.HCL,
|
||||
hclDataDir,
|
||||
},
|
||||
}),
|
||||
WithOverrides(config.FileSource{
|
||||
// Create NodeID outside the closure, so that it does not change
|
||||
testHCLConfig := TestConfigHCL(NodeID())
|
||||
loader := func(source config.Source) (*config.RuntimeConfig, []string, error) {
|
||||
opts := config.BuilderOpts{
|
||||
HCL: []string{testHCLConfig, portsConfig, a.HCL, hclDataDir},
|
||||
}
|
||||
overrides := []config.Source{
|
||||
config.FileSource{
|
||||
Name: "test-overrides",
|
||||
Format: "hcl",
|
||||
Data: a.Overrides},
|
||||
config.DefaultConsulSource(),
|
||||
config.DevConsulSource(),
|
||||
),
|
||||
}
|
||||
cfg, warnings, err := config.Load(opts, source, overrides...)
|
||||
if cfg != nil {
|
||||
cfg.Telemetry.Disable = true
|
||||
}
|
||||
return cfg, warnings, err
|
||||
}
|
||||
bd, err := NewBaseDeps(loader, logOutput)
|
||||
require.NoError(t, err)
|
||||
|
||||
agent, err := New(opts...)
|
||||
bd.Logger = logger
|
||||
bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
a.Config = bd.RuntimeConfig
|
||||
|
||||
agent, err := New(bd)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error creating agent: %s", err)
|
||||
}
|
||||
|
||||
a.Config = agent.GetConfig()
|
||||
|
||||
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
|
||||
id := string(a.Config.NodeID)
|
||||
|
||||
if err := agent.Start(context.Background()); err != nil {
|
||||
agent.ShutdownAgent()
|
||||
agent.ShutdownEndpoints()
|
||||
|
@ -60,11 +60,6 @@ type cmd struct {
|
||||
logger hclog.InterceptLogger
|
||||
}
|
||||
|
||||
type GatedUi struct {
|
||||
JSONoutput bool
|
||||
ui cli.Ui
|
||||
}
|
||||
|
||||
func (c *cmd) init() {
|
||||
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
|
||||
config.AddFlags(c.flags, &c.flagArgs)
|
||||
@ -165,25 +160,26 @@ func (c *cmd) run(args []string) int {
|
||||
return 1
|
||||
}
|
||||
|
||||
logGate := logging.GatedWriter{Writer: &cli.UiWriter{Ui: c.UI}}
|
||||
|
||||
agentOptions := []agent.AgentOption{
|
||||
agent.WithBuilderOpts(c.flagArgs),
|
||||
agent.WithCLI(c.UI),
|
||||
agent.WithLogWriter(&logGate),
|
||||
agent.WithTelemetry(true),
|
||||
logGate := &logging.GatedWriter{Writer: &cli.UiWriter{Ui: c.UI}}
|
||||
loader := func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error) {
|
||||
return config.Load(c.flagArgs, source)
|
||||
}
|
||||
|
||||
agent, err := agent.New(agentOptions...)
|
||||
bd, err := agent.NewBaseDeps(loader, logGate)
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
config := agent.GetConfig()
|
||||
c.logger = agent.GetLogger()
|
||||
c.logger = bd.Logger
|
||||
agent, err := agent.New(bd)
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
//Setup gate to check if we should output CLI information
|
||||
config := bd.RuntimeConfig
|
||||
|
||||
// Setup gate to check if we should output CLI information
|
||||
cli := GatedUi{
|
||||
JSONoutput: config.LogJSON,
|
||||
ui: c.UI,
|
||||
@ -344,6 +340,11 @@ func (c *cmd) run(args []string) int {
|
||||
}
|
||||
}
|
||||
|
||||
type GatedUi struct {
|
||||
JSONoutput bool
|
||||
ui cli.Ui
|
||||
}
|
||||
|
||||
func (g *GatedUi) output(s string) {
|
||||
if !g.JSONoutput {
|
||||
g.ui.Output(s)
|
||||
|
@ -19,6 +19,10 @@ import (
|
||||
// the shared InitTelemetry functions below, but we can't import agent/config
|
||||
// due to a dependency cycle.
|
||||
type TelemetryConfig struct {
|
||||
// Disable may be set to true to have InitTelemetry to skip initialization
|
||||
// and return a nil MetricsSink.
|
||||
Disable bool
|
||||
|
||||
// Circonus*: see https://github.com/circonus-labs/circonus-gometrics
|
||||
// for more details on the various configuration options.
|
||||
// Valid configuration combinations:
|
||||
@ -326,6 +330,9 @@ func circonusSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, err
|
||||
// InitTelemetry configures go-metrics based on map of telemetry config
|
||||
// values as returned by Runtimecfg.Config().
|
||||
func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) {
|
||||
if cfg.Disable {
|
||||
return nil, nil
|
||||
}
|
||||
// Setup telemetry
|
||||
// Aggregate on 10 second intervals for 1 minute. Expose the
|
||||
// metrics over stderr when there is a SIGUSR1 received.
|
||||
|
@ -19,9 +19,9 @@ type GRPCLogger struct {
|
||||
// Note that grpclog has Info, Warning, Error, Fatal severity levels AND integer
|
||||
// verbosity levels for additional info. Verbose logs in glog are always INFO
|
||||
// severity so we map Info,V0 to INFO, Info,V1 to DEBUG, and Info,V>1 to TRACE.
|
||||
func NewGRPCLogger(config *Config, logger hclog.Logger) *GRPCLogger {
|
||||
func NewGRPCLogger(logLevel string, logger hclog.Logger) *GRPCLogger {
|
||||
return &GRPCLogger{
|
||||
level: config.LogLevel,
|
||||
level: logLevel,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ func TestGRPCLogger(t *testing.T) {
|
||||
Output: &out,
|
||||
TimeFormat: "timeformat",
|
||||
})
|
||||
grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: "TRACE"}, logger))
|
||||
grpclog.SetLoggerV2(NewGRPCLogger("TRACE", logger))
|
||||
|
||||
// All of these should output something
|
||||
grpclog.Info("Info,")
|
||||
@ -92,7 +92,7 @@ func TestGRPCLogger_V(t *testing.T) {
|
||||
Level: hclog.Trace,
|
||||
Output: &out,
|
||||
})
|
||||
grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: tt.level}, logger))
|
||||
grpclog.SetLoggerV2(NewGRPCLogger(tt.level, logger))
|
||||
|
||||
assert.Equal(t, tt.want, grpclog.V(tt.v))
|
||||
})
|
||||
|
@ -63,6 +63,7 @@ type LogSetupErrorFn func(string)
|
||||
// The provided ui object will get any log messages related to setting up
|
||||
// logging itself, and will also be hooked up to the gated logger. The final bool
|
||||
// parameter indicates if logging was set up successfully.
|
||||
// TODO: accept a single io.Writer
|
||||
func Setup(config *Config, writers []io.Writer) (hclog.InterceptLogger, error) {
|
||||
if !ValidateLogLevel(config.LogLevel) {
|
||||
return nil, fmt.Errorf("Invalid log level: %s. Valid log levels are: %v",
|
||||
|
Loading…
x
Reference in New Issue
Block a user