mirror of https://github.com/status-im/consul.git
Merge pull request #8086 from hashicorp/feature/auto-config/client-config-inject
This commit is contained in:
commit
abce1f0eee
|
@ -187,11 +187,11 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru
|
|||
// Vet the check itself.
|
||||
if len(check.ServiceName) > 0 {
|
||||
if authz.ServiceWrite(check.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.ErrPermissionDenied
|
||||
return acl.PermissionDenied("Missing service:write on %v", structs.ServiceIDString(check.ServiceName, &check.EnterpriseMeta))
|
||||
}
|
||||
} else {
|
||||
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
|
||||
return acl.ErrPermissionDenied
|
||||
return acl.PermissionDenied("Missing node:write on %s", a.config.NodeName)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,11 +199,11 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru
|
|||
if existing := a.State.Check(check.CompoundCheckID()); existing != nil {
|
||||
if len(existing.ServiceName) > 0 {
|
||||
if authz.ServiceWrite(existing.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.ErrPermissionDenied
|
||||
return acl.PermissionDenied("Missing service:write on %s", structs.ServiceIDString(existing.ServiceName, &existing.EnterpriseMeta))
|
||||
}
|
||||
} else {
|
||||
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
|
||||
return acl.ErrPermissionDenied
|
||||
return acl.PermissionDenied("Missing node:write on %s", a.config.NodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ type TestACLAgent struct {
|
|||
// The key is that we are the delegate so we can control the ResolveToken responses
|
||||
func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzResolver, resolveIdent identResolver) *TestACLAgent {
|
||||
a := &TestACLAgent{Name: name, HCL: hcl, resolveAuthzFn: resolveAuthz, resolveIdentFn: resolveIdent}
|
||||
hclDataDir := `data_dir = "acl-agent"`
|
||||
dataDir := `data_dir = "acl-agent"`
|
||||
|
||||
logOutput := testutil.TestWriter(t)
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
|
@ -66,13 +66,20 @@ func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzRe
|
|||
Output: logOutput,
|
||||
})
|
||||
|
||||
a.Config = TestConfig(logger,
|
||||
config.Source{Name: a.Name, Format: "hcl", Data: a.HCL},
|
||||
config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir},
|
||||
)
|
||||
opts := []AgentOption{
|
||||
WithLogger(logger),
|
||||
WithBuilderOpts(config.BuilderOpts{
|
||||
HCL: []string{
|
||||
TestConfigHCL(NodeID()),
|
||||
a.HCL,
|
||||
dataDir,
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
agent, err := New(a.Config, logger)
|
||||
agent, err := New(opts...)
|
||||
require.NoError(t, err)
|
||||
a.Config = agent.GetConfig()
|
||||
a.Agent = agent
|
||||
|
||||
agent.LogOutput = logOutput
|
||||
|
@ -258,7 +265,7 @@ var (
|
|||
nodeRWSecret: {
|
||||
token: structs.ACLToken{
|
||||
AccessorID: "efb6b7d5-d343-47c1-b4cb-aa6b94d2f490",
|
||||
SecretID: nodeROSecret,
|
||||
SecretID: nodeRWSecret,
|
||||
},
|
||||
rules: `node_prefix "Node" { policy = "write" }`,
|
||||
},
|
||||
|
|
317
agent/agent.go
317
agent/agent.go
|
@ -21,18 +21,22 @@ import (
|
|||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/mitchellh/cli"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/ae"
|
||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/systemd"
|
||||
|
@ -160,6 +164,8 @@ type notifier interface {
|
|||
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
||||
// requests to other Consul servers.
|
||||
type Agent struct {
|
||||
autoConf *autoconf.AutoConfig
|
||||
|
||||
// config is the agent configuration.
|
||||
config *config.RuntimeConfig
|
||||
|
||||
|
@ -247,8 +253,6 @@ type Agent struct {
|
|||
eventLock sync.RWMutex
|
||||
eventNotify NotifyGroup
|
||||
|
||||
reloadCh chan chan error
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
|
@ -313,22 +317,103 @@ type Agent struct {
|
|||
// IP.
|
||||
httpConnLimiter connlimit.Limiter
|
||||
|
||||
// Connection Pool
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
||||
enterpriseAgent
|
||||
}
|
||||
|
||||
// New verifies the configuration given has a Datacenter and DataDir
|
||||
// configured, and maps the remaining config fields to fields on the Agent.
|
||||
func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error) {
|
||||
if c.Datacenter == "" {
|
||||
return nil, fmt.Errorf("Must configure a Datacenter")
|
||||
}
|
||||
if c.DataDir == "" && !c.DevMode {
|
||||
return nil, fmt.Errorf("Must configure a DataDir")
|
||||
}
|
||||
type agentOptions struct {
|
||||
logger hclog.InterceptLogger
|
||||
builderOpts config.BuilderOpts
|
||||
ui cli.Ui
|
||||
config *config.RuntimeConfig
|
||||
overrides []config.Source
|
||||
writers []io.Writer
|
||||
}
|
||||
|
||||
type AgentOption func(opt *agentOptions)
|
||||
|
||||
// WithLogger is used to override any automatic logger creation
|
||||
// and provide one already built instead. This is mostly useful
|
||||
// for testing.
|
||||
func WithLogger(logger hclog.InterceptLogger) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
// WithBuilderOpts specifies the command line config.BuilderOpts to use that the agent
|
||||
// is being started with
|
||||
func WithBuilderOpts(builderOpts config.BuilderOpts) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.builderOpts = builderOpts
|
||||
}
|
||||
}
|
||||
|
||||
// WithCLI provides a cli.Ui instance to use when emitting configuration
|
||||
// warnings during the first configuration parsing.
|
||||
func WithCLI(ui cli.Ui) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.ui = ui
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogWriter will add an additional log output to the logger that gets
|
||||
// configured after configuration parsing
|
||||
func WithLogWriter(writer io.Writer) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.writers = append(opt.writers, writer)
|
||||
}
|
||||
}
|
||||
|
||||
// WithOverrides is used to provide a config source to append to the tail sources
|
||||
// during config building. It is really only useful for testing to tune non-user
|
||||
// configurable tunables to make various tests converge more quickly than they
|
||||
// could otherwise.
|
||||
func WithOverrides(overrides ...config.Source) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.overrides = overrides
|
||||
}
|
||||
}
|
||||
|
||||
// WithConfig provides an already parsed configuration to the Agent
|
||||
// Deprecated: Should allow the agent to parse the configuration.
|
||||
func WithConfig(config *config.RuntimeConfig) AgentOption {
|
||||
return func(opt *agentOptions) {
|
||||
opt.config = config
|
||||
}
|
||||
}
|
||||
|
||||
func flattenAgentOptions(options []AgentOption) agentOptions {
|
||||
var flat agentOptions
|
||||
for _, opt := range options {
|
||||
opt(&flat)
|
||||
}
|
||||
return flat
|
||||
}
|
||||
|
||||
// New process the desired options and creates a new Agent.
|
||||
// This process will
|
||||
// * parse the config given the config Flags
|
||||
// * setup logging
|
||||
// * using predefined logger given in an option
|
||||
// OR
|
||||
// * initialize a new logger from the configuration
|
||||
// including setting up gRPC logging
|
||||
// * initialize telemetry
|
||||
// * create a TLS Configurator
|
||||
// * build a shared connection pool
|
||||
// * create the ServiceManager
|
||||
// * setup the NodeID if one isn't provided in the configuration
|
||||
// * create the AutoConfig object for future use in fully
|
||||
// resolving the configuration
|
||||
func New(options ...AgentOption) (*Agent, error) {
|
||||
flat := flattenAgentOptions(options)
|
||||
|
||||
// Create most of the agent
|
||||
a := Agent{
|
||||
config: c,
|
||||
checkReapAfter: make(map[structs.CheckID]time.Duration),
|
||||
checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor),
|
||||
checkTTLs: make(map[structs.CheckID]*checks.CheckTTL),
|
||||
|
@ -340,14 +425,84 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error)
|
|||
eventCh: make(chan serf.UserEvent, 1024),
|
||||
eventBuf: make([]*UserEvent, 256),
|
||||
joinLANNotifier: &systemd.Notifier{},
|
||||
reloadCh: make(chan chan error),
|
||||
retryJoinCh: make(chan error),
|
||||
shutdownCh: make(chan struct{}),
|
||||
InterruptStartCh: make(chan struct{}),
|
||||
endpoints: make(map[string]string),
|
||||
tokens: new(token.Store),
|
||||
logger: logger,
|
||||
logger: flat.logger,
|
||||
}
|
||||
|
||||
// parse the configuration and handle the error/warnings
|
||||
config, warnings, err := autoconf.LoadConfig(flat.builderOpts, config.Source{}, flat.overrides...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, w := range warnings {
|
||||
if a.logger != nil {
|
||||
a.logger.Warn(w)
|
||||
} else if flat.ui != nil {
|
||||
flat.ui.Warn(w)
|
||||
} else {
|
||||
fmt.Fprint(os.Stderr, w)
|
||||
}
|
||||
}
|
||||
|
||||
// set the config in the agent, this is just the preliminary configuration as we haven't
|
||||
// loaded any auto-config sources yet.
|
||||
a.config = config
|
||||
|
||||
if flat.logger == nil {
|
||||
logConf := &logging.Config{
|
||||
LogLevel: config.LogLevel,
|
||||
LogJSON: config.LogJSON,
|
||||
Name: logging.Agent,
|
||||
EnableSyslog: config.EnableSyslog,
|
||||
SyslogFacility: config.SyslogFacility,
|
||||
LogFilePath: config.LogFile,
|
||||
LogRotateDuration: config.LogRotateDuration,
|
||||
LogRotateBytes: config.LogRotateBytes,
|
||||
LogRotateMaxFiles: config.LogRotateMaxFiles,
|
||||
}
|
||||
|
||||
logger, logOutput, err := logging.Setup(logConf, flat.writers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a.logger = logger
|
||||
a.LogOutput = logOutput
|
||||
|
||||
grpclog.SetLoggerV2(logging.NewGRPCLogger(logConf, a.logger))
|
||||
}
|
||||
|
||||
memSink, err := lib.InitTelemetry(config.Telemetry)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to initialize telemetry: %w", err)
|
||||
}
|
||||
a.MemSink = memSink
|
||||
|
||||
// TODO (autoconf) figure out how to let this setting be pushed down via autoconf
|
||||
// right now it gets defaulted if unset so this check actually doesn't do much
|
||||
// for a normal running agent.
|
||||
if a.config.Datacenter == "" {
|
||||
return nil, fmt.Errorf("Must configure a Datacenter")
|
||||
}
|
||||
if a.config.DataDir == "" && !a.config.DevMode {
|
||||
return nil, fmt.Errorf("Must configure a DataDir")
|
||||
}
|
||||
|
||||
tlsConfigurator, err := tlsutil.NewConfigurator(a.config.ToTLSUtilConfig(), a.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
a.tlsConfigurator = tlsConfigurator
|
||||
|
||||
err = a.initializeConnectionPool()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to initialize the connection pool: %w", err)
|
||||
}
|
||||
|
||||
a.serviceManager = NewServiceManager(&a)
|
||||
|
||||
if err := a.initializeACLs(); err != nil {
|
||||
|
@ -356,13 +511,76 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error)
|
|||
|
||||
// Retrieve or generate the node ID before setting up the rest of the
|
||||
// agent, which depends on it.
|
||||
if err := a.setupNodeID(c); err != nil {
|
||||
if err := a.setupNodeID(a.config); err != nil {
|
||||
return nil, fmt.Errorf("Failed to setup node ID: %v", err)
|
||||
}
|
||||
|
||||
acOpts := []autoconf.Option{
|
||||
autoconf.WithDirectRPC(a.connPool),
|
||||
autoconf.WithTLSConfigurator(a.tlsConfigurator),
|
||||
autoconf.WithBuilderOpts(flat.builderOpts),
|
||||
autoconf.WithLogger(a.logger),
|
||||
autoconf.WithOverrides(flat.overrides...),
|
||||
}
|
||||
ac, err := autoconf.New(acOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a.autoConf = ac
|
||||
|
||||
return &a, nil
|
||||
}
|
||||
|
||||
// GetLogger retrieves the agents logger
|
||||
// TODO make export the logger field and get rid of this method
|
||||
// This is here for now to simplify the work I am doing and make
|
||||
// reviewing the final PR easier.
|
||||
func (a *Agent) GetLogger() hclog.InterceptLogger {
|
||||
return a.logger
|
||||
}
|
||||
|
||||
// GetConfig retrieves the agents config
|
||||
// TODO make export the config field and get rid of this method
|
||||
// This is here for now to simplify the work I am doing and make
|
||||
// reviewing the final PR easier.
|
||||
func (a *Agent) GetConfig() *config.RuntimeConfig {
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
return a.config
|
||||
}
|
||||
|
||||
func (a *Agent) initializeConnectionPool() error {
|
||||
var rpcSrcAddr *net.TCPAddr
|
||||
if !ipaddr.IsAny(a.config.RPCBindAddr) {
|
||||
rpcSrcAddr = &net.TCPAddr{IP: a.config.RPCBindAddr.IP}
|
||||
}
|
||||
|
||||
// Ensure we have a log output for the connection pool.
|
||||
logOutput := a.LogOutput
|
||||
if logOutput == nil {
|
||||
logOutput = os.Stderr
|
||||
}
|
||||
|
||||
pool := &pool.ConnPool{
|
||||
Server: a.config.ServerMode,
|
||||
SrcAddr: rpcSrcAddr,
|
||||
LogOutput: logOutput,
|
||||
TLSConfigurator: a.tlsConfigurator,
|
||||
Datacenter: a.config.Datacenter,
|
||||
}
|
||||
if a.config.ServerMode {
|
||||
pool.MaxTime = 2 * time.Minute
|
||||
pool.MaxStreams = 64
|
||||
} else {
|
||||
pool.MaxTime = 127 * time.Second
|
||||
pool.MaxStreams = 32
|
||||
}
|
||||
|
||||
a.connPool = pool
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config
|
||||
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
||||
lc := local.Config{
|
||||
|
@ -385,7 +603,23 @@ func (a *Agent) Start() error {
|
|||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
||||
c := a.config
|
||||
// This needs to be done early on as it will potentially alter the configuration
|
||||
// and then how other bits are brought up
|
||||
c, err := a.autoConf.InitialConfiguration(&lib.StopChannelContext{StopCh: a.shutdownCh})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// copy over the existing node id, this cannot be
|
||||
// changed while running anyways but this prevents
|
||||
// breaking some existing behavior. then overwrite
|
||||
// the configuration
|
||||
c.NodeID = a.config.NodeID
|
||||
a.config = c
|
||||
|
||||
if err := a.tlsConfigurator.Update(a.config.ToTLSUtilConfig()); err != nil {
|
||||
return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err)
|
||||
}
|
||||
|
||||
if err := a.CheckSecurity(c); err != nil {
|
||||
a.logger.Error("Security error while parsing configuration: %#v", err)
|
||||
|
@ -438,21 +672,22 @@ func (a *Agent) Start() error {
|
|||
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
|
||||
}
|
||||
|
||||
tlsConfigurator, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), a.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
options := []consul.ConsulOption{
|
||||
consul.WithLogger(a.logger),
|
||||
consul.WithTokenStore(a.tokens),
|
||||
consul.WithTLSConfigurator(a.tlsConfigurator),
|
||||
consul.WithConnectionPool(a.connPool),
|
||||
}
|
||||
a.tlsConfigurator = tlsConfigurator
|
||||
|
||||
// Setup either the client or the server.
|
||||
if c.ServerMode {
|
||||
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens, a.tlsConfigurator)
|
||||
server, err := consul.NewServerWithOptions(consulCfg, options...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||
}
|
||||
a.delegate = server
|
||||
} else {
|
||||
client, err := consul.NewClientLogger(consulCfg, a.logger, a.tlsConfigurator)
|
||||
client, err := consul.NewClientWithOptions(consulCfg, options...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul client: %v", err)
|
||||
}
|
||||
|
@ -1873,12 +2108,6 @@ func (a *Agent) ShutdownEndpoints() {
|
|||
a.logger.Info("Endpoints down")
|
||||
}
|
||||
|
||||
// ReloadCh is used to return a channel that can be
|
||||
// used for triggering reloads and returning a response.
|
||||
func (a *Agent) ReloadCh() chan chan error {
|
||||
return a.reloadCh
|
||||
}
|
||||
|
||||
// RetryJoinCh is a channel that transports errors
|
||||
// from the retry join process.
|
||||
func (a *Agent) RetryJoinCh() <-chan error {
|
||||
|
@ -4052,14 +4281,40 @@ func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
|
|||
a.config.RPCMaxBurst = conf.RPCMaxBurst
|
||||
}
|
||||
|
||||
// ReloadConfig will atomically reload all configs from the given newCfg,
|
||||
// including all services, checks, tokens, metadata, dnsServer configs, etc.
|
||||
// ReloadConfig will atomically reload all configuration, including
|
||||
// all services, checks, tokens, metadata, dnsServer configs, etc.
|
||||
// It will also reload all ongoing watches.
|
||||
func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
||||
func (a *Agent) ReloadConfig() error {
|
||||
newCfg, err := a.autoConf.ReadConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// copy over the existing node id, this cannot be
|
||||
// changed while running anyways but this prevents
|
||||
// breaking some existing behavior.
|
||||
newCfg.NodeID = a.config.NodeID
|
||||
|
||||
return a.reloadConfigInternal(newCfg)
|
||||
}
|
||||
|
||||
// reloadConfigInternal is mainly needed for some unit tests. Instead of parsing
|
||||
// the configuration using CLI flags and on disk config, this just takes a
|
||||
// runtime configuration and applies it.
|
||||
func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
||||
if err := a.CheckSecurity(newCfg); err != nil {
|
||||
a.logger.Error("Security error while reloading configuration: %#v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Change the log level and update it
|
||||
if logging.ValidateLogLevel(newCfg.LogLevel) {
|
||||
a.logger.SetLevel(logging.LevelFromString(newCfg.LogLevel))
|
||||
} else {
|
||||
a.logger.Warn("Invalid log level in new configuration", "level", newCfg.LogLevel)
|
||||
newCfg.LogLevel = a.config.LogLevel
|
||||
}
|
||||
|
||||
// Bulk update the services and checks
|
||||
a.PauseSync()
|
||||
defer a.ResumeSync()
|
||||
|
|
|
@ -154,21 +154,7 @@ func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (i
|
|||
return nil, acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
// Trigger the reload
|
||||
errCh := make(chan error)
|
||||
select {
|
||||
case <-s.agent.shutdownCh:
|
||||
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
|
||||
case s.agent.reloadCh <- errCh:
|
||||
}
|
||||
|
||||
// Wait for the result of the reload, or for the agent to shutdown
|
||||
select {
|
||||
case <-s.agent.shutdownCh:
|
||||
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
|
||||
case err := <-errCh:
|
||||
return nil, err
|
||||
}
|
||||
return nil, s.agent.ReloadConfig()
|
||||
}
|
||||
|
||||
func buildAgentService(s *structs.NodeService) api.AgentService {
|
||||
|
|
|
@ -501,7 +501,7 @@ func TestAgent_Service(t *testing.T) {
|
|||
updateFunc: func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Reload
|
||||
require.NoError(t, a.ReloadConfig(a.Config))
|
||||
require.NoError(t, a.reloadConfigInternal(a.Config))
|
||||
},
|
||||
// Should eventually timeout since there is no actual change
|
||||
wantWait: 200 * time.Millisecond,
|
||||
|
@ -519,7 +519,7 @@ func TestAgent_Service(t *testing.T) {
|
|||
// Reload
|
||||
newConfig := *a.Config
|
||||
newConfig.Services = append(newConfig.Services, &updatedProxy)
|
||||
require.NoError(t, a.ReloadConfig(&newConfig))
|
||||
require.NoError(t, a.reloadConfigInternal(&newConfig))
|
||||
},
|
||||
wantWait: 100 * time.Millisecond,
|
||||
wantCode: 200,
|
||||
|
@ -1352,7 +1352,7 @@ func TestAgent_Reload(t *testing.T) {
|
|||
`,
|
||||
})
|
||||
|
||||
if err := a.ReloadConfig(cfg2); err != nil {
|
||||
if err := a.reloadConfigInternal(cfg2); err != nil {
|
||||
t.Fatalf("got error %v want nil", err)
|
||||
}
|
||||
if a.State.Service(structs.NewServiceID("redis-reloaded", nil)) == nil {
|
||||
|
@ -1505,7 +1505,7 @@ func TestAgent_ReloadDoesNotTriggerWatch(t *testing.T) {
|
|||
// We check that reload does not go to critical
|
||||
ensureNothingCritical(r, "red-is-dead")
|
||||
|
||||
if err := a.ReloadConfig(cfg2); err != nil {
|
||||
if err := a.reloadConfigInternal(cfg2); err != nil {
|
||||
t.Fatalf("got error %v want nil", err)
|
||||
}
|
||||
|
||||
|
@ -5194,7 +5194,11 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
|||
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
a := NewTestAgent(t, "")
|
||||
a := StartTestAgent(t, TestAgent{Overrides: `
|
||||
connect {
|
||||
test_ca_leaf_root_change_spread = "1ns"
|
||||
}
|
||||
`})
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
testrpc.WaitForActiveCARoot(t, a.RPC, "dc1", nil)
|
||||
|
@ -5297,7 +5301,11 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
|
|||
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
a := NewTestAgent(t, "")
|
||||
a := StartTestAgent(t, TestAgent{Overrides: `
|
||||
connect {
|
||||
test_ca_leaf_root_change_spread = "1ns"
|
||||
}
|
||||
`})
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
testrpc.WaitForActiveCARoot(t, a.RPC, "dc1", nil)
|
||||
|
@ -5421,6 +5429,10 @@ func TestAgentConnectCALeafCert_secondaryDC_good(t *testing.T) {
|
|||
a1 := StartTestAgent(t, TestAgent{Name: "dc1", HCL: `
|
||||
datacenter = "dc1"
|
||||
primary_datacenter = "dc1"
|
||||
`, Overrides: `
|
||||
connect {
|
||||
test_ca_leaf_root_change_spread = "1ns"
|
||||
}
|
||||
`})
|
||||
defer a1.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a1.RPC, "dc1")
|
||||
|
@ -5428,6 +5440,10 @@ func TestAgentConnectCALeafCert_secondaryDC_good(t *testing.T) {
|
|||
a2 := StartTestAgent(t, TestAgent{Name: "dc2", HCL: `
|
||||
datacenter = "dc2"
|
||||
primary_datacenter = "dc1"
|
||||
`, Overrides: `
|
||||
connect {
|
||||
test_ca_leaf_root_change_spread = "1ns"
|
||||
}
|
||||
`})
|
||||
defer a2.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a2.RPC, "dc2")
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/internal/go-sso/oidcauth/oidcauthtest"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
|
@ -36,6 +38,7 @@ import (
|
|||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/square/go-jose.v2/jwt"
|
||||
)
|
||||
|
||||
func getService(a *TestAgent, id string) *structs.NodeService {
|
||||
|
@ -338,7 +341,7 @@ func TestAgent_makeNodeID(t *testing.T) {
|
|||
|
||||
// Turn on host-based IDs and try again. We should get the same ID
|
||||
// each time (and a different one from the random one above).
|
||||
a.Config.DisableHostNodeID = false
|
||||
a.GetConfig().DisableHostNodeID = false
|
||||
id, err = a.makeNodeID()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -2830,10 +2833,10 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
|
|||
|
||||
func TestAgent_Service_Reap(t *testing.T) {
|
||||
// t.Parallel() // timing test. no parallel
|
||||
a := NewTestAgent(t, `
|
||||
a := StartTestAgent(t, TestAgent{Overrides: `
|
||||
check_reap_interval = "50ms"
|
||||
check_deregister_interval_min = "0s"
|
||||
`)
|
||||
`})
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
|
||||
|
@ -2885,10 +2888,10 @@ func TestAgent_Service_Reap(t *testing.T) {
|
|||
|
||||
func TestAgent_Service_NoReap(t *testing.T) {
|
||||
// t.Parallel() // timing test. no parallel
|
||||
a := NewTestAgent(t, `
|
||||
a := StartTestAgent(t, TestAgent{Overrides: `
|
||||
check_reap_interval = "50ms"
|
||||
check_deregister_interval_min = "0s"
|
||||
`)
|
||||
`})
|
||||
defer a.Shutdown()
|
||||
|
||||
svc := &structs.NodeService{
|
||||
|
@ -3574,7 +3577,7 @@ func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) {
|
|||
verify_server_hostname = true
|
||||
`
|
||||
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
|
||||
require.NoError(t, a.ReloadConfig(c))
|
||||
require.NoError(t, a.reloadConfigInternal(c))
|
||||
tlsConf = a.tlsConfigurator.OutgoingRPCConfig()
|
||||
require.False(t, tlsConf.InsecureSkipVerify)
|
||||
require.Len(t, tlsConf.RootCAs.Subjects(), 2)
|
||||
|
@ -3604,7 +3607,7 @@ func TestAgent_ReloadConfigAndKeepChecksStatus(t *testing.T) {
|
|||
}
|
||||
|
||||
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
|
||||
require.NoError(t, a.ReloadConfig(c))
|
||||
require.NoError(t, a.reloadConfigInternal(c))
|
||||
// After reload, should be passing directly (no critical state)
|
||||
for id, check := range a.State.Checks(nil) {
|
||||
require.Equal(t, "passing", check.Status, "check %q is wrong", id)
|
||||
|
@ -3643,7 +3646,7 @@ func TestAgent_ReloadConfigIncomingRPCConfig(t *testing.T) {
|
|||
verify_server_hostname = true
|
||||
`
|
||||
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
|
||||
require.NoError(t, a.ReloadConfig(c))
|
||||
require.NoError(t, a.reloadConfigInternal(c))
|
||||
tlsConf, err = tlsConf.GetConfigForClient(nil)
|
||||
require.NoError(t, err)
|
||||
require.False(t, tlsConf.InsecureSkipVerify)
|
||||
|
@ -3672,7 +3675,7 @@ func TestAgent_ReloadConfigTLSConfigFailure(t *testing.T) {
|
|||
verify_incoming = true
|
||||
`
|
||||
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
|
||||
require.Error(t, a.ReloadConfig(c))
|
||||
require.Error(t, a.reloadConfigInternal(c))
|
||||
tlsConf, err := tlsConf.GetConfigForClient(nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tls.NoClientCert, tlsConf.ClientAuth)
|
||||
|
@ -4546,3 +4549,109 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutoConfig_Integration(t *testing.T) {
|
||||
// eventually this test should really live with integration tests
|
||||
// the goal here is to have one test server and another test client
|
||||
// spin up both agents and allow the server to authorize the auto config
|
||||
// request and then see the client joined
|
||||
|
||||
cfgDir := testutil.TempDir(t, "auto-config")
|
||||
|
||||
// write some test TLS certificates out to the cfg dir
|
||||
cert, key, cacert, err := testTLSCertificates("server.dc1.consul")
|
||||
require.NoError(t, err)
|
||||
|
||||
certFile := filepath.Join(cfgDir, "cert.pem")
|
||||
caFile := filepath.Join(cfgDir, "cacert.pem")
|
||||
keyFile := filepath.Join(cfgDir, "key.pem")
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(certFile, []byte(cert), 0600))
|
||||
require.NoError(t, ioutil.WriteFile(caFile, []byte(cacert), 0600))
|
||||
require.NoError(t, ioutil.WriteFile(keyFile, []byte(key), 0600))
|
||||
|
||||
// generate a gossip key
|
||||
gossipKey := make([]byte, 32)
|
||||
n, err := rand.Read(gossipKey)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 32, n)
|
||||
gossipKeyEncoded := base64.StdEncoding.EncodeToString(gossipKey)
|
||||
|
||||
// generate the JWT signing keys
|
||||
pub, priv, err := oidcauthtest.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
hclConfig := TestACLConfigWithParams(nil) + `
|
||||
encrypt = "` + gossipKeyEncoded + `"
|
||||
encrypt_verify_incoming = true
|
||||
encrypt_verify_outgoing = true
|
||||
verify_incoming = true
|
||||
verify_outgoing = true
|
||||
verify_server_hostname = true
|
||||
ca_file = "` + caFile + `"
|
||||
cert_file = "` + certFile + `"
|
||||
key_file = "` + keyFile + `"
|
||||
connect { enabled = true }
|
||||
auto_encrypt { allow_tls = true }
|
||||
auto_config {
|
||||
authorizer {
|
||||
enabled = true
|
||||
claim_mappings = {
|
||||
consul_node_name = "node"
|
||||
}
|
||||
claim_assertions = [
|
||||
"value.node == \"${node}\""
|
||||
]
|
||||
bound_issuer = "consul"
|
||||
bound_audiences = [
|
||||
"consul"
|
||||
]
|
||||
jwt_validation_pub_keys = ["` + strings.ReplaceAll(pub, "\n", "\\n") + `"]
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
srv := StartTestAgent(t, TestAgent{Name: "TestAgent-Server", HCL: hclConfig})
|
||||
defer srv.Shutdown()
|
||||
|
||||
testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultMasterToken))
|
||||
|
||||
// sign a JWT token
|
||||
now := time.Now()
|
||||
token, err := oidcauthtest.SignJWT(priv, jwt.Claims{
|
||||
Subject: "consul",
|
||||
Issuer: "consul",
|
||||
Audience: jwt.Audience{"consul"},
|
||||
NotBefore: jwt.NewNumericDate(now.Add(-1 * time.Second)),
|
||||
Expiry: jwt.NewNumericDate(now.Add(5 * time.Minute)),
|
||||
}, map[string]interface{}{
|
||||
"consul_node_name": "test-client",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
client := StartTestAgent(t, TestAgent{Name: "test-client", HCL: TestACLConfigWithParams(nil) + `
|
||||
bootstrap = false
|
||||
server = false
|
||||
ca_file = "` + caFile + `"
|
||||
verify_outgoing = true
|
||||
verify_server_hostname = true
|
||||
node_name = "test-client"
|
||||
ports {
|
||||
server = ` + strconv.Itoa(srv.Config.RPCBindAddr.Port) + `
|
||||
}
|
||||
auto_config {
|
||||
enabled = true
|
||||
intro_token = "` + token + `"
|
||||
server_addresses = ["` + srv.Config.RPCBindAddr.String() + `"]
|
||||
}`})
|
||||
|
||||
defer client.Shutdown()
|
||||
|
||||
// when this is successful we managed to get the gossip key and serf addresses to bind to
|
||||
// and then connect. Additionally we would have to have certificates or else the
|
||||
// verify_incoming config on the server would not let it work.
|
||||
testrpc.WaitForTestAgent(t, client.RPC, "dc1", testrpc.WithToken(TestDefaultMasterToken))
|
||||
|
||||
// spot check that we now have an ACL token
|
||||
require.NotEmpty(t, client.tokens.AgentToken())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,480 @@
|
|||
package autoconf
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/agentpb"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-discover"
|
||||
discoverk8s "github.com/hashicorp/go-discover/provider/k8s"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
const (
|
||||
// autoConfigFileName is the name of the file that the agent auto-config settings are
|
||||
// stored in within the data directory
|
||||
autoConfigFileName = "auto-config.json"
|
||||
)
|
||||
|
||||
// DirectRPC is the interface that needs to be satisifed for AutoConfig to be able to perform
|
||||
// direct RPCs against individual servers. This should not use
|
||||
type DirectRPC interface {
|
||||
RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}) error
|
||||
}
|
||||
|
||||
type options struct {
|
||||
logger hclog.Logger
|
||||
directRPC DirectRPC
|
||||
tlsConfigurator *tlsutil.Configurator
|
||||
builderOpts config.BuilderOpts
|
||||
waiter *lib.RetryWaiter
|
||||
overrides []config.Source
|
||||
}
|
||||
|
||||
// Option represents one point of configurability for the New function
|
||||
// when creating a new AutoConfig object
|
||||
type Option func(*options)
|
||||
|
||||
// WithLogger will cause the created AutoConfig type to use the provided logger
|
||||
func WithLogger(logger hclog.Logger) Option {
|
||||
return func(opt *options) {
|
||||
opt.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
// WithTLSConfigurator will cause the created AutoConfig type to use the provided configurator
|
||||
func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) Option {
|
||||
return func(opt *options) {
|
||||
opt.tlsConfigurator = tlsConfigurator
|
||||
}
|
||||
}
|
||||
|
||||
// WithConnectionPool will cause the created AutoConfig type to use the provided connection pool
|
||||
func WithDirectRPC(directRPC DirectRPC) Option {
|
||||
return func(opt *options) {
|
||||
opt.directRPC = directRPC
|
||||
}
|
||||
}
|
||||
|
||||
// WithBuilderOpts will cause the created AutoConfig type to use the provided CLI builderOpts
|
||||
func WithBuilderOpts(builderOpts config.BuilderOpts) Option {
|
||||
return func(opt *options) {
|
||||
opt.builderOpts = builderOpts
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetryWaiter will cause the created AutoConfig type to use the provided retry waiter
|
||||
func WithRetryWaiter(waiter *lib.RetryWaiter) Option {
|
||||
return func(opt *options) {
|
||||
opt.waiter = waiter
|
||||
}
|
||||
}
|
||||
|
||||
// WithOverrides is used to provide a config source to append to the tail sources
|
||||
// during config building. It is really only useful for testing to tune non-user
|
||||
// configurable tunables to make various tests converge more quickly than they
|
||||
// could otherwise.
|
||||
func WithOverrides(overrides ...config.Source) Option {
|
||||
return func(opt *options) {
|
||||
opt.overrides = overrides
|
||||
}
|
||||
}
|
||||
|
||||
// AutoConfig is all the state necessary for being able to parse a configuration
|
||||
// as well as perform the necessary RPCs to perform Agent Auto Configuration.
|
||||
//
|
||||
// NOTE: This struct and methods on it are not currently thread/goroutine safe.
|
||||
// However it doesn't spawn any of its own go routines yet and is used in a
|
||||
// synchronous fashion. In the future if either of those two conditions change
|
||||
// then we will need to add some locking here. I am deferring that for now
|
||||
// to help ease the review of this already large PR.
|
||||
type AutoConfig struct {
|
||||
config *config.RuntimeConfig
|
||||
builderOpts config.BuilderOpts
|
||||
logger hclog.Logger
|
||||
directRPC DirectRPC
|
||||
tlsConfigurator *tlsutil.Configurator
|
||||
autoConfigData string
|
||||
waiter *lib.RetryWaiter
|
||||
overrides []config.Source
|
||||
}
|
||||
|
||||
func flattenOptions(opts []Option) options {
|
||||
var flat options
|
||||
for _, opt := range opts {
|
||||
opt(&flat)
|
||||
}
|
||||
return flat
|
||||
}
|
||||
|
||||
// New creates a new AutoConfig object for providing automatic
|
||||
// Consul configuration.
|
||||
func New(options ...Option) (*AutoConfig, error) {
|
||||
flat := flattenOptions(options)
|
||||
|
||||
if flat.directRPC == nil {
|
||||
return nil, fmt.Errorf("must provide a direct RPC delegate")
|
||||
}
|
||||
|
||||
if flat.tlsConfigurator == nil {
|
||||
return nil, fmt.Errorf("must provide a TLS configurator")
|
||||
}
|
||||
|
||||
logger := flat.logger
|
||||
if logger == nil {
|
||||
logger = hclog.NewNullLogger()
|
||||
} else {
|
||||
logger = logger.Named(logging.AutoConfig)
|
||||
}
|
||||
|
||||
waiter := flat.waiter
|
||||
if waiter == nil {
|
||||
waiter = lib.NewRetryWaiter(1, 0, 10*time.Minute, lib.NewJitterRandomStagger(25))
|
||||
}
|
||||
|
||||
ac := &AutoConfig{
|
||||
builderOpts: flat.builderOpts,
|
||||
logger: logger,
|
||||
directRPC: flat.directRPC,
|
||||
tlsConfigurator: flat.tlsConfigurator,
|
||||
waiter: waiter,
|
||||
overrides: flat.overrides,
|
||||
}
|
||||
|
||||
return ac, nil
|
||||
}
|
||||
|
||||
// LoadConfig will build the configuration including the extraHead source injected
|
||||
// after all other defaults but before any user supplied configuration and the overrides
|
||||
// source injected as the final source in the configuration parsing chain.
|
||||
func LoadConfig(builderOpts config.BuilderOpts, extraHead config.Source, overrides ...config.Source) (*config.RuntimeConfig, []string, error) {
|
||||
b, err := config.NewBuilder(builderOpts)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if extraHead.Data != "" {
|
||||
b.Head = append(b.Head, extraHead)
|
||||
}
|
||||
|
||||
if len(overrides) != 0 {
|
||||
b.Tail = append(b.Tail, overrides...)
|
||||
}
|
||||
|
||||
cfg, err := b.BuildAndValidate()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return &cfg, b.Warnings, nil
|
||||
}
|
||||
|
||||
// ReadConfig will parse the current configuration and inject any
|
||||
// auto-config sources if present into the correct place in the parsing chain.
|
||||
func (ac *AutoConfig) ReadConfig() (*config.RuntimeConfig, error) {
|
||||
src := config.Source{
|
||||
Name: autoConfigFileName,
|
||||
Format: "json",
|
||||
Data: ac.autoConfigData,
|
||||
}
|
||||
|
||||
cfg, warnings, err := LoadConfig(ac.builderOpts, src, ac.overrides...)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
for _, w := range warnings {
|
||||
ac.logger.Warn(w)
|
||||
}
|
||||
|
||||
ac.config = cfg
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// restorePersistedAutoConfig will attempt to load the persisted auto-config
|
||||
// settings from the data directory. It returns true either when there was an
|
||||
// unrecoverable error or when the configuration was successfully loaded from
|
||||
// disk. Recoverable errors, such as "file not found" are suppressed and this
|
||||
// method will return false for the first boolean.
|
||||
func (ac *AutoConfig) restorePersistedAutoConfig() (bool, error) {
|
||||
if ac.config.DataDir == "" {
|
||||
// no data directory means we don't have anything to potentially load
|
||||
return false, nil
|
||||
}
|
||||
|
||||
path := filepath.Join(ac.config.DataDir, autoConfigFileName)
|
||||
ac.logger.Debug("attempting to restore any persisted configuration", "path", path)
|
||||
|
||||
content, err := ioutil.ReadFile(path)
|
||||
if err == nil {
|
||||
ac.logger.Info("restored persisted configuration", "path", path)
|
||||
ac.autoConfigData = string(content)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if !os.IsNotExist(err) {
|
||||
return true, fmt.Errorf("failed to load %s: %w", path, err)
|
||||
}
|
||||
|
||||
// ignore non-existence errors as that is an indicator that we haven't
|
||||
// performed the auto configuration before
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// InitialConfiguration will perform a one-time RPC request to the configured servers
|
||||
// to retrieve various cluster wide configurations. See the agent/agentpb/auto_config.proto
|
||||
// file for a complete reference of what configurations can be applied in this manner.
|
||||
// The returned configuration will be the new configuration with any auto-config settings
|
||||
// already applied. If AutoConfig is not enabled this method will just parse any
|
||||
// local configuration and return the built runtime configuration.
|
||||
//
|
||||
// The context passed in can be used to cancel the retrieval of the initial configuration
|
||||
// like when receiving a signal during startup.
|
||||
func (ac *AutoConfig) InitialConfiguration(ctx context.Context) (*config.RuntimeConfig, error) {
|
||||
if ac.config == nil {
|
||||
config, err := ac.ReadConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ac.config = config
|
||||
}
|
||||
|
||||
if !ac.config.AutoConfig.Enabled {
|
||||
return ac.config, nil
|
||||
}
|
||||
|
||||
ready, err := ac.restorePersistedAutoConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !ready {
|
||||
if err := ac.getInitialConfiguration(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// re-read the configuration now that we have our initial auto-config
|
||||
config, err := ac.ReadConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ac.config = config
|
||||
return ac.config, nil
|
||||
}
|
||||
|
||||
// introToken is responsible for determining the correct intro token to use
|
||||
// when making the initial Cluster.AutoConfig RPC request.
|
||||
func (ac *AutoConfig) introToken() (string, error) {
|
||||
conf := ac.config.AutoConfig
|
||||
// without an intro token or intro token file we cannot do anything
|
||||
if conf.IntroToken == "" && conf.IntroTokenFile == "" {
|
||||
return "", fmt.Errorf("neither intro_token or intro_token_file settings are not configured")
|
||||
}
|
||||
|
||||
token := conf.IntroToken
|
||||
if token == "" {
|
||||
// load the intro token from the file
|
||||
content, err := ioutil.ReadFile(conf.IntroTokenFile)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Failed to read intro token from file: %w", err)
|
||||
}
|
||||
|
||||
token = string(content)
|
||||
|
||||
if token == "" {
|
||||
return "", fmt.Errorf("intro_token_file did not contain any token")
|
||||
}
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
// autoConfigHosts is responsible for taking the list of server addresses and
|
||||
// resolving any go-discover provider invocations. It will then return a list
|
||||
// of hosts. These might be hostnames and is expected that DNS resolution may
|
||||
// be performed after this function runs. Additionally these may contain ports
|
||||
// so SplitHostPort could also be necessary.
|
||||
func (ac *AutoConfig) autoConfigHosts() ([]string, error) {
|
||||
servers := ac.config.AutoConfig.ServerAddresses
|
||||
|
||||
providers := make(map[string]discover.Provider)
|
||||
for k, v := range discover.Providers {
|
||||
providers[k] = v
|
||||
}
|
||||
providers["k8s"] = &discoverk8s.Provider{}
|
||||
|
||||
disco, err := discover.New(
|
||||
discover.WithUserAgent(lib.UserAgent()),
|
||||
discover.WithProviders(providers),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create go-discover resolver: %w", err)
|
||||
}
|
||||
|
||||
var addrs []string
|
||||
for _, addr := range servers {
|
||||
switch {
|
||||
case strings.Contains(addr, "provider="):
|
||||
resolved, err := disco.Addrs(addr, ac.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}))
|
||||
if err != nil {
|
||||
ac.logger.Error("failed to resolve go-discover auto-config servers", "configuration", addr, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
addrs = append(addrs, resolved...)
|
||||
ac.logger.Debug("discovered auto-config servers", "servers", resolved)
|
||||
default:
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
}
|
||||
|
||||
if len(addrs) == 0 {
|
||||
return nil, fmt.Errorf("no auto-config server addresses available for use")
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
// resolveHost will take a single host string and convert it to a list of TCPAddrs
|
||||
// This will process any port in the input as well as looking up the hostname using
|
||||
// normal DNS resolution.
|
||||
func (ac *AutoConfig) resolveHost(hostPort string) []net.TCPAddr {
|
||||
port := ac.config.ServerPort
|
||||
host, portStr, err := net.SplitHostPort(hostPort)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "missing port in address") {
|
||||
host = hostPort
|
||||
} else {
|
||||
ac.logger.Warn("error splitting host address into IP and port", "address", hostPort, "error", err)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
port, err = strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
ac.logger.Warn("Parsed port is not an integer", "port", portStr, "error", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// resolve the host to a list of IPs
|
||||
ips, err := net.LookupIP(host)
|
||||
if err != nil {
|
||||
ac.logger.Warn("IP resolution failed", "host", host, "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var addrs []net.TCPAddr
|
||||
for _, ip := range ips {
|
||||
addrs = append(addrs, net.TCPAddr{IP: ip, Port: port})
|
||||
}
|
||||
|
||||
return addrs
|
||||
}
|
||||
|
||||
// recordAutoConfigReply takes an AutoConfig RPC reply records it with the agent
|
||||
// This will persist the configuration to disk (unless in dev mode running without
|
||||
// a data dir) and will reload the configuration.
|
||||
func (ac *AutoConfig) recordAutoConfigReply(reply *agentpb.AutoConfigResponse) error {
|
||||
conf, err := json.Marshal(translateConfig(reply.Config))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode auto-config configuration as JSON: %w", err)
|
||||
}
|
||||
|
||||
ac.autoConfigData = string(conf)
|
||||
|
||||
if ac.config.DataDir == "" {
|
||||
ac.logger.Debug("not persisting auto-config settings because there is no data directory")
|
||||
return nil
|
||||
}
|
||||
|
||||
path := filepath.Join(ac.config.DataDir, autoConfigFileName)
|
||||
|
||||
err = ioutil.WriteFile(path, conf, 0660)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write auto-config configurations: %w", err)
|
||||
}
|
||||
|
||||
ac.logger.Debug("auto-config settings were persisted to disk")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getInitialConfigurationOnce will perform full server to TCPAddr resolution and
|
||||
// loop through each host trying to make the Cluster.AutoConfig RPC call. When
|
||||
// successful the bool return will be true and the err value will indicate whether we
|
||||
// successfully recorded the auto config settings (persisted to disk and stored internally
|
||||
// on the AutoConfig object)
|
||||
func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context) (bool, error) {
|
||||
token, err := ac.introToken()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
request := agentpb.AutoConfigRequest{
|
||||
Datacenter: ac.config.Datacenter,
|
||||
Node: ac.config.NodeName,
|
||||
Segment: ac.config.SegmentName,
|
||||
JWT: token,
|
||||
}
|
||||
|
||||
var reply agentpb.AutoConfigResponse
|
||||
|
||||
servers, err := ac.autoConfigHosts()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, s := range servers {
|
||||
// try each IP to see if we can successfully make the request
|
||||
for _, addr := range ac.resolveHost(s) {
|
||||
if ctx.Err() != nil {
|
||||
return false, ctx.Err()
|
||||
}
|
||||
|
||||
ac.logger.Debug("Making Cluster.AutoConfig RPC", "addr", addr.String())
|
||||
if err = ac.directRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "Cluster.AutoConfig", &request, &reply); err != nil {
|
||||
ac.logger.Error("AutoConfig RPC failed", "addr", addr.String(), "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
return true, ac.recordAutoConfigReply(&reply)
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// getInitialConfiguration implements a loop to retry calls to getInitialConfigurationOnce.
|
||||
// It uses the RetryWaiter on the AutoConfig object to control how often to attempt
|
||||
// the initial configuration process. It is also canceallable by cancelling the provided context.
|
||||
func (ac *AutoConfig) getInitialConfiguration(ctx context.Context) error {
|
||||
// this resets the failures so that we will perform immediate request
|
||||
wait := ac.waiter.Success()
|
||||
for {
|
||||
select {
|
||||
case <-wait:
|
||||
if done, err := ac.getInitialConfigurationOnce(ctx); done {
|
||||
return err
|
||||
}
|
||||
wait = ac.waiter.Failed()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,397 @@
|
|||
package autoconf
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/agentpb"
|
||||
pbconfig "github.com/hashicorp/consul/agent/agentpb/config"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockDirectRPC struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockDirectRPC) RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}) error {
|
||||
retValues := m.Called(dc, node, addr, method, args, reply)
|
||||
switch ret := retValues.Get(0).(type) {
|
||||
case error:
|
||||
return ret
|
||||
case func(interface{}):
|
||||
ret(reply)
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("This should not happen, update mock direct rpc expectations")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
type testCase struct {
|
||||
opts []Option
|
||||
err string
|
||||
validate func(t *testing.T, ac *AutoConfig)
|
||||
}
|
||||
|
||||
cases := map[string]testCase{
|
||||
"no-direct-rpc": {
|
||||
opts: []Option{
|
||||
WithTLSConfigurator(&tlsutil.Configurator{}),
|
||||
},
|
||||
err: "must provide a direct RPC delegate",
|
||||
},
|
||||
"no-tls-configurator": {
|
||||
opts: []Option{
|
||||
WithDirectRPC(&mockDirectRPC{}),
|
||||
},
|
||||
err: "must provide a TLS configurator",
|
||||
},
|
||||
"ok": {
|
||||
opts: []Option{
|
||||
WithTLSConfigurator(&tlsutil.Configurator{}),
|
||||
WithDirectRPC(&mockDirectRPC{}),
|
||||
},
|
||||
validate: func(t *testing.T, ac *AutoConfig) {
|
||||
t.Helper()
|
||||
require.NotNil(t, ac.logger)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
ac, err := New(tcase.opts...)
|
||||
if tcase.err != "" {
|
||||
testutil.RequireErrorContains(t, err, tcase.err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
if tcase.validate != nil {
|
||||
tcase.validate(t, ac)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadConfig(t *testing.T) {
|
||||
// Basically just testing that injection of the extra
|
||||
// source works.
|
||||
devMode := true
|
||||
builderOpts := config.BuilderOpts{
|
||||
// putting this in dev mode so that the config validates
|
||||
// without having to specify a data directory
|
||||
DevMode: &devMode,
|
||||
}
|
||||
|
||||
cfg, warnings, err := LoadConfig(builderOpts, config.Source{
|
||||
Name: "test",
|
||||
Format: "hcl",
|
||||
Data: `node_name = "hobbiton"`,
|
||||
},
|
||||
config.Source{
|
||||
Name: "overrides",
|
||||
Format: "json",
|
||||
Data: `{"check_reap_interval": "1ms"}`,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, warnings)
|
||||
require.NotNil(t, cfg)
|
||||
require.Equal(t, "hobbiton", cfg.NodeName)
|
||||
require.Equal(t, 1*time.Millisecond, cfg.CheckReapInterval)
|
||||
}
|
||||
|
||||
func TestReadConfig(t *testing.T) {
|
||||
// just testing that some auto config source gets injected
|
||||
devMode := true
|
||||
ac := AutoConfig{
|
||||
autoConfigData: `{"node_name": "hobbiton"}`,
|
||||
builderOpts: config.BuilderOpts{
|
||||
// putting this in dev mode so that the config validates
|
||||
// without having to specify a data directory
|
||||
DevMode: &devMode,
|
||||
},
|
||||
logger: testutil.Logger(t),
|
||||
}
|
||||
|
||||
cfg, err := ac.ReadConfig()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cfg)
|
||||
require.Equal(t, "hobbiton", cfg.NodeName)
|
||||
require.Same(t, ac.config, cfg)
|
||||
}
|
||||
|
||||
func testSetupAutoConf(t *testing.T) (string, string, config.BuilderOpts) {
|
||||
t.Helper()
|
||||
|
||||
// create top level directory to hold both config and data
|
||||
tld := testutil.TempDir(t, "auto-config")
|
||||
t.Cleanup(func() { os.RemoveAll(tld) })
|
||||
|
||||
// create the data directory
|
||||
dataDir := filepath.Join(tld, "data")
|
||||
require.NoError(t, os.Mkdir(dataDir, 0700))
|
||||
|
||||
// create the config directory
|
||||
configDir := filepath.Join(tld, "config")
|
||||
require.NoError(t, os.Mkdir(configDir, 0700))
|
||||
|
||||
builderOpts := config.BuilderOpts{
|
||||
HCL: []string{
|
||||
`data_dir = "` + dataDir + `"`,
|
||||
`datacenter = "dc1"`,
|
||||
`node_name = "autoconf"`,
|
||||
`bind_addr = "127.0.0.1"`,
|
||||
},
|
||||
}
|
||||
|
||||
return dataDir, configDir, builderOpts
|
||||
}
|
||||
|
||||
func TestInitialConfiguration_disabled(t *testing.T) {
|
||||
dataDir, configDir, builderOpts := testSetupAutoConf(t)
|
||||
|
||||
cfgFile := filepath.Join(configDir, "test.json")
|
||||
require.NoError(t, ioutil.WriteFile(cfgFile, []byte(`{
|
||||
"primary_datacenter": "primary",
|
||||
"auto_config": {"enabled": false}
|
||||
}`), 0600))
|
||||
|
||||
builderOpts.ConfigFiles = append(builderOpts.ConfigFiles, cfgFile)
|
||||
|
||||
directRPC := mockDirectRPC{}
|
||||
ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
|
||||
cfg, err := ac.InitialConfiguration(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cfg)
|
||||
require.Equal(t, "primary", cfg.PrimaryDatacenter)
|
||||
require.NoFileExists(t, filepath.Join(dataDir, autoConfigFileName))
|
||||
|
||||
// ensure no RPC was made
|
||||
directRPC.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestInitialConfiguration_cancelled(t *testing.T) {
|
||||
_, configDir, builderOpts := testSetupAutoConf(t)
|
||||
|
||||
cfgFile := filepath.Join(configDir, "test.json")
|
||||
require.NoError(t, ioutil.WriteFile(cfgFile, []byte(`{
|
||||
"primary_datacenter": "primary",
|
||||
"auto_config": {"enabled": true, "intro_token": "blarg", "server_addresses": ["127.0.0.1:8300"]}
|
||||
}`), 0600))
|
||||
|
||||
builderOpts.ConfigFiles = append(builderOpts.ConfigFiles, cfgFile)
|
||||
|
||||
directRPC := mockDirectRPC{}
|
||||
|
||||
expectedRequest := agentpb.AutoConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "autoconf",
|
||||
JWT: "blarg",
|
||||
}
|
||||
|
||||
directRPC.On("RPC", "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, "Cluster.AutoConfig", &expectedRequest, mock.Anything).Return(fmt.Errorf("injected error")).Times(0)
|
||||
ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(100*time.Millisecond))
|
||||
defer cancelFn()
|
||||
|
||||
cfg, err := ac.InitialConfiguration(ctx)
|
||||
testutil.RequireErrorContains(t, err, context.DeadlineExceeded.Error())
|
||||
require.Nil(t, cfg)
|
||||
|
||||
// ensure no RPC was made
|
||||
directRPC.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestInitialConfiguration_restored(t *testing.T) {
|
||||
dataDir, configDir, builderOpts := testSetupAutoConf(t)
|
||||
|
||||
cfgFile := filepath.Join(configDir, "test.json")
|
||||
require.NoError(t, ioutil.WriteFile(cfgFile, []byte(`{
|
||||
"auto_config": {"enabled": true, "intro_token": "blarg", "server_addresses": ["127.0.0.1:8300"]}
|
||||
}`), 0600))
|
||||
|
||||
builderOpts.ConfigFiles = append(builderOpts.ConfigFiles, cfgFile)
|
||||
|
||||
// persist an auto config response to the data dir where it is expected
|
||||
persistedFile := filepath.Join(dataDir, autoConfigFileName)
|
||||
response := &pbconfig.Config{
|
||||
PrimaryDatacenter: "primary",
|
||||
}
|
||||
data, err := json.Marshal(translateConfig(response))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, ioutil.WriteFile(persistedFile, data, 0600))
|
||||
|
||||
directRPC := mockDirectRPC{}
|
||||
|
||||
ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
|
||||
cfg, err := ac.InitialConfiguration(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cfg)
|
||||
require.Equal(t, "primary", cfg.PrimaryDatacenter)
|
||||
|
||||
// ensure no RPC was made
|
||||
directRPC.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestInitialConfiguration_success(t *testing.T) {
|
||||
dataDir, configDir, builderOpts := testSetupAutoConf(t)
|
||||
|
||||
cfgFile := filepath.Join(configDir, "test.json")
|
||||
require.NoError(t, ioutil.WriteFile(cfgFile, []byte(`{
|
||||
"auto_config": {"enabled": true, "intro_token": "blarg", "server_addresses": ["127.0.0.1:8300"]}
|
||||
}`), 0600))
|
||||
|
||||
builderOpts.ConfigFiles = append(builderOpts.ConfigFiles, cfgFile)
|
||||
|
||||
persistedFile := filepath.Join(dataDir, autoConfigFileName)
|
||||
directRPC := mockDirectRPC{}
|
||||
|
||||
populateResponse := func(val interface{}) {
|
||||
resp, ok := val.(*agentpb.AutoConfigResponse)
|
||||
require.True(t, ok)
|
||||
resp.Config = &pbconfig.Config{
|
||||
PrimaryDatacenter: "primary",
|
||||
}
|
||||
}
|
||||
|
||||
expectedRequest := agentpb.AutoConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "autoconf",
|
||||
JWT: "blarg",
|
||||
}
|
||||
|
||||
directRPC.On(
|
||||
"RPC",
|
||||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300},
|
||||
"Cluster.AutoConfig",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(populateResponse)
|
||||
|
||||
ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
|
||||
cfg, err := ac.InitialConfiguration(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cfg)
|
||||
require.Equal(t, "primary", cfg.PrimaryDatacenter)
|
||||
|
||||
// the file was written to.
|
||||
require.FileExists(t, persistedFile)
|
||||
|
||||
// ensure no RPC was made
|
||||
directRPC.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestInitialConfiguration_retries(t *testing.T) {
|
||||
dataDir, configDir, builderOpts := testSetupAutoConf(t)
|
||||
|
||||
cfgFile := filepath.Join(configDir, "test.json")
|
||||
require.NoError(t, ioutil.WriteFile(cfgFile, []byte(`{
|
||||
"auto_config": {"enabled": true, "intro_token": "blarg", "server_addresses": ["198.18.0.1", "198.18.0.2:8398", "198.18.0.3:8399", "127.0.0.1:1234"]}
|
||||
}`), 0600))
|
||||
|
||||
builderOpts.ConfigFiles = append(builderOpts.ConfigFiles, cfgFile)
|
||||
|
||||
persistedFile := filepath.Join(dataDir, autoConfigFileName)
|
||||
directRPC := mockDirectRPC{}
|
||||
|
||||
populateResponse := func(val interface{}) {
|
||||
resp, ok := val.(*agentpb.AutoConfigResponse)
|
||||
require.True(t, ok)
|
||||
resp.Config = &pbconfig.Config{
|
||||
PrimaryDatacenter: "primary",
|
||||
}
|
||||
}
|
||||
|
||||
expectedRequest := agentpb.AutoConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "autoconf",
|
||||
JWT: "blarg",
|
||||
}
|
||||
|
||||
// basically the 198.18.0.* addresses should fail indefinitely. the first time through the
|
||||
// outer loop we inject a failure for the DNS resolution of localhost to 127.0.0.1. Then
|
||||
// the second time through the outer loop we allow the localhost one to work.
|
||||
directRPC.On(
|
||||
"RPC",
|
||||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 1), Port: 8300},
|
||||
"Cluster.AutoConfig",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
|
||||
directRPC.On(
|
||||
"RPC",
|
||||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 2), Port: 8398},
|
||||
"Cluster.AutoConfig",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
|
||||
directRPC.On(
|
||||
"RPC",
|
||||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 3), Port: 8399},
|
||||
"Cluster.AutoConfig",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
|
||||
directRPC.On(
|
||||
"RPC",
|
||||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234},
|
||||
"Cluster.AutoConfig",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Once()
|
||||
directRPC.On(
|
||||
"RPC",
|
||||
"dc1",
|
||||
"autoconf",
|
||||
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234},
|
||||
"Cluster.AutoConfig",
|
||||
&expectedRequest,
|
||||
&agentpb.AutoConfigResponse{}).Return(populateResponse)
|
||||
|
||||
waiter := lib.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
||||
ac, err := New(WithBuilderOpts(builderOpts), WithTLSConfigurator(&tlsutil.Configurator{}), WithDirectRPC(&directRPC), WithRetryWaiter(waiter))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ac)
|
||||
|
||||
cfg, err := ac.InitialConfiguration(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cfg)
|
||||
require.Equal(t, "primary", cfg.PrimaryDatacenter)
|
||||
|
||||
// the file was written to.
|
||||
require.FileExists(t, persistedFile)
|
||||
|
||||
// ensure no RPC was made
|
||||
directRPC.AssertExpectations(t)
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
package autoconf
|
||||
|
||||
import (
|
||||
pbconfig "github.com/hashicorp/consul/agent/agentpb/config"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
)
|
||||
|
||||
// translateAgentConfig is meant to take in a agent/agentpb/config.Config type
|
||||
// and craft the corresponding agent/config.Config type. The need for this function
|
||||
// should eventually be removed with the protobuf and normal version converging.
|
||||
// In the meantime, its not desirable to have the flatter Config struct in protobufs
|
||||
// as in the long term we want a configuration with more nested groupings.
|
||||
//
|
||||
// Why is this function not in the agent/agentpb/config package? The answer, that
|
||||
// package cannot import the agent/config package without running into import cycles.
|
||||
func translateConfig(c *pbconfig.Config) *config.Config {
|
||||
out := config.Config{
|
||||
Datacenter: &c.Datacenter,
|
||||
PrimaryDatacenter: &c.PrimaryDatacenter,
|
||||
NodeName: &c.NodeName,
|
||||
SegmentName: &c.SegmentName,
|
||||
}
|
||||
|
||||
// Translate Auto Encrypt settings
|
||||
if a := c.AutoEncrypt; a != nil {
|
||||
out.AutoEncrypt = config.AutoEncrypt{
|
||||
TLS: &a.TLS,
|
||||
DNSSAN: a.DNSSAN,
|
||||
IPSAN: a.IPSAN,
|
||||
AllowTLS: &a.AllowTLS,
|
||||
}
|
||||
}
|
||||
|
||||
// Translate all the ACL settings
|
||||
if a := c.ACL; a != nil {
|
||||
out.ACL = config.ACL{
|
||||
Enabled: &a.Enabled,
|
||||
PolicyTTL: &a.PolicyTTL,
|
||||
RoleTTL: &a.RoleTTL,
|
||||
TokenTTL: &a.TokenTTL,
|
||||
DownPolicy: &a.DownPolicy,
|
||||
DefaultPolicy: &a.DefaultPolicy,
|
||||
EnableKeyListPolicy: &a.EnableKeyListPolicy,
|
||||
DisabledTTL: &a.DisabledTTL,
|
||||
EnableTokenPersistence: &a.EnableTokenPersistence,
|
||||
MSPDisableBootstrap: &a.MSPDisableBootstrap,
|
||||
}
|
||||
|
||||
if t := c.ACL.Tokens; t != nil {
|
||||
var tokens []config.ServiceProviderToken
|
||||
|
||||
// create the slice of msp tokens if any
|
||||
for _, mspToken := range t.ManagedServiceProvider {
|
||||
tokens = append(tokens, config.ServiceProviderToken{
|
||||
AccessorID: &mspToken.AccessorID,
|
||||
SecretID: &mspToken.SecretID,
|
||||
})
|
||||
}
|
||||
|
||||
out.ACL.Tokens = config.Tokens{
|
||||
Master: &t.Master,
|
||||
Replication: &t.Replication,
|
||||
AgentMaster: &t.AgentMaster,
|
||||
Default: &t.Default,
|
||||
Agent: &t.Agent,
|
||||
ManagedServiceProvider: tokens,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Translate the Gossip settings
|
||||
if g := c.Gossip; g != nil {
|
||||
out.RetryJoinLAN = g.RetryJoinLAN
|
||||
|
||||
// Translate the Gossip Encryption settings
|
||||
if e := c.Gossip.Encryption; e != nil {
|
||||
out.EncryptKey = &e.Key
|
||||
out.EncryptVerifyIncoming = &e.VerifyIncoming
|
||||
out.EncryptVerifyOutgoing = &e.VerifyOutgoing
|
||||
}
|
||||
}
|
||||
|
||||
// Translate the Generic TLS settings
|
||||
if t := c.TLS; t != nil {
|
||||
out.VerifyOutgoing = &t.VerifyOutgoing
|
||||
out.VerifyServerHostname = &t.VerifyServerHostname
|
||||
out.TLSMinVersion = &t.MinVersion
|
||||
out.TLSCipherSuites = &t.CipherSuites
|
||||
out.TLSPreferServerCipherSuites = &t.PreferServerCipherSuites
|
||||
}
|
||||
|
||||
return &out
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package autoconf
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
pbconfig "github.com/hashicorp/consul/agent/agentpb/config"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func stringPointer(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func boolPointer(b bool) *bool {
|
||||
return &b
|
||||
}
|
||||
|
||||
func TestConfig_translateConfig(t *testing.T) {
|
||||
original := pbconfig.Config{
|
||||
Datacenter: "abc",
|
||||
PrimaryDatacenter: "def",
|
||||
NodeName: "ghi",
|
||||
SegmentName: "jkl",
|
||||
ACL: &pbconfig.ACL{
|
||||
Enabled: true,
|
||||
PolicyTTL: "1s",
|
||||
RoleTTL: "2s",
|
||||
TokenTTL: "3s",
|
||||
DownPolicy: "deny",
|
||||
DefaultPolicy: "deny",
|
||||
EnableKeyListPolicy: true,
|
||||
DisabledTTL: "4s",
|
||||
EnableTokenPersistence: true,
|
||||
MSPDisableBootstrap: false,
|
||||
Tokens: &pbconfig.ACLTokens{
|
||||
Master: "99e7e490-6baf-43fc-9010-78b6aa9a6813",
|
||||
Replication: "51308d40-465c-4ac6-a636-7c0747edec89",
|
||||
AgentMaster: "e012e1ea-78a2-41cc-bc8b-231a44196f39",
|
||||
Default: "8781a3f5-de46-4b45-83e1-c92f4cfd0332",
|
||||
Agent: "ddb8f1b0-8a99-4032-b601-87926bce244e",
|
||||
ManagedServiceProvider: []*pbconfig.ACLServiceProviderToken{
|
||||
{
|
||||
AccessorID: "23f37987-7b9e-4e5b-acae-dbc9bc137bae",
|
||||
SecretID: "e28b820a-438e-4e2b-ad24-fe59e6a4914f",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
AutoEncrypt: &pbconfig.AutoEncrypt{
|
||||
TLS: true,
|
||||
DNSSAN: []string{"dns"},
|
||||
IPSAN: []string{"198.18.0.1"},
|
||||
AllowTLS: false,
|
||||
},
|
||||
Gossip: &pbconfig.Gossip{
|
||||
RetryJoinLAN: []string{"10.0.0.1"},
|
||||
Encryption: &pbconfig.GossipEncryption{
|
||||
Key: "blarg",
|
||||
VerifyOutgoing: true,
|
||||
VerifyIncoming: true,
|
||||
},
|
||||
},
|
||||
TLS: &pbconfig.TLS{
|
||||
VerifyOutgoing: true,
|
||||
VerifyServerHostname: true,
|
||||
CipherSuites: "stuff",
|
||||
MinVersion: "tls13",
|
||||
PreferServerCipherSuites: true,
|
||||
},
|
||||
}
|
||||
|
||||
expected := &config.Config{
|
||||
Datacenter: stringPointer("abc"),
|
||||
PrimaryDatacenter: stringPointer("def"),
|
||||
NodeName: stringPointer("ghi"),
|
||||
SegmentName: stringPointer("jkl"),
|
||||
RetryJoinLAN: []string{"10.0.0.1"},
|
||||
EncryptKey: stringPointer("blarg"),
|
||||
EncryptVerifyIncoming: boolPointer(true),
|
||||
EncryptVerifyOutgoing: boolPointer(true),
|
||||
VerifyOutgoing: boolPointer(true),
|
||||
VerifyServerHostname: boolPointer(true),
|
||||
TLSCipherSuites: stringPointer("stuff"),
|
||||
TLSMinVersion: stringPointer("tls13"),
|
||||
TLSPreferServerCipherSuites: boolPointer(true),
|
||||
ACL: config.ACL{
|
||||
Enabled: boolPointer(true),
|
||||
PolicyTTL: stringPointer("1s"),
|
||||
RoleTTL: stringPointer("2s"),
|
||||
TokenTTL: stringPointer("3s"),
|
||||
DownPolicy: stringPointer("deny"),
|
||||
DefaultPolicy: stringPointer("deny"),
|
||||
EnableKeyListPolicy: boolPointer(true),
|
||||
DisabledTTL: stringPointer("4s"),
|
||||
EnableTokenPersistence: boolPointer(true),
|
||||
MSPDisableBootstrap: boolPointer(false),
|
||||
Tokens: config.Tokens{
|
||||
Master: stringPointer("99e7e490-6baf-43fc-9010-78b6aa9a6813"),
|
||||
Replication: stringPointer("51308d40-465c-4ac6-a636-7c0747edec89"),
|
||||
AgentMaster: stringPointer("e012e1ea-78a2-41cc-bc8b-231a44196f39"),
|
||||
Default: stringPointer("8781a3f5-de46-4b45-83e1-c92f4cfd0332"),
|
||||
Agent: stringPointer("ddb8f1b0-8a99-4032-b601-87926bce244e"),
|
||||
ManagedServiceProvider: []config.ServiceProviderToken{
|
||||
{
|
||||
AccessorID: stringPointer("23f37987-7b9e-4e5b-acae-dbc9bc137bae"),
|
||||
SecretID: stringPointer("e28b820a-438e-4e2b-ad24-fe59e6a4914f"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
AutoEncrypt: config.AutoEncrypt{
|
||||
TLS: boolPointer(true),
|
||||
DNSSAN: []string{"dns"},
|
||||
IPSAN: []string{"198.18.0.1"},
|
||||
AllowTLS: boolPointer(false),
|
||||
},
|
||||
}
|
||||
|
||||
actual := translateConfig(&original)
|
||||
require.Equal(t, expected, actual)
|
||||
}
|
|
@ -906,6 +906,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
ConnectMeshGatewayWANFederationEnabled: connectMeshGatewayWANFederationEnabled,
|
||||
ConnectSidecarMinPort: sidecarMinPort,
|
||||
ConnectSidecarMaxPort: sidecarMaxPort,
|
||||
ConnectTestCALeafRootChangeSpread: b.durationVal("connect.test_ca_leaf_root_change_spread", c.Connect.TestCALeafRootChangeSpread),
|
||||
ExposeMinPort: exposeMinPort,
|
||||
ExposeMaxPort: exposeMaxPort,
|
||||
DataDir: b.stringVal(c.DataDir),
|
||||
|
|
|
@ -513,6 +513,11 @@ type Connect struct {
|
|||
CAProvider *string `json:"ca_provider,omitempty" hcl:"ca_provider" mapstructure:"ca_provider"`
|
||||
CAConfig map[string]interface{} `json:"ca_config,omitempty" hcl:"ca_config" mapstructure:"ca_config"`
|
||||
MeshGatewayWANFederationEnabled *bool `json:"enable_mesh_gateway_wan_federation" hcl:"enable_mesh_gateway_wan_federation" mapstructure:"enable_mesh_gateway_wan_federation"`
|
||||
|
||||
// TestCALeafRootChangeSpread controls how long after a CA roots change before new leaft certs will be generated.
|
||||
// This is only tuned in tests, generally set to 1ns to make tests deterministic with when to expect updated leaf
|
||||
// certs by. This configuration is not exposed to users (not documented, and agent/config/default.go will override it)
|
||||
TestCALeafRootChangeSpread *string `json:"test_ca_leaf_root_change_spread,omitempty" hcl:"test_ca_leaf_root_change_spread" mapstructure:"test_ca_leaf_root_change_spread"`
|
||||
}
|
||||
|
||||
// SOA is the configuration of SOA for DNS
|
||||
|
|
|
@ -190,6 +190,12 @@ func NonUserSource() Source {
|
|||
|
||||
# SegmentNameLimit is the maximum segment name length.
|
||||
segment_name_limit = 64
|
||||
|
||||
connect = {
|
||||
# 0s causes the value to be ignored and operate without capping
|
||||
# the max time before leaf certs can be generated after a roots change.
|
||||
test_ca_leaf_root_change_spread = "0s"
|
||||
}
|
||||
`,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,7 +101,13 @@ func NewClient(config *Config) (*Client, error) {
|
|||
return NewClientLogger(config, nil, c)
|
||||
}
|
||||
|
||||
func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurator *tlsutil.Configurator) (*Client, error) {
|
||||
func NewClientWithOptions(config *Config, options ...ConsulOption) (*Client, error) {
|
||||
flat := flattenConsulOptions(options)
|
||||
|
||||
logger := flat.logger
|
||||
tlsConfigurator := flat.tlsConfigurator
|
||||
connPool := flat.connPool
|
||||
|
||||
// Check the protocol version
|
||||
if err := config.CheckProtocolVersion(); err != nil {
|
||||
return nil, err
|
||||
|
@ -130,14 +136,16 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
|
|||
})
|
||||
}
|
||||
|
||||
connPool := &pool.ConnPool{
|
||||
Server: false,
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
LogOutput: config.LogOutput,
|
||||
MaxTime: clientRPCConnMaxIdle,
|
||||
MaxStreams: clientMaxStreams,
|
||||
TLSConfigurator: tlsConfigurator,
|
||||
Datacenter: config.Datacenter,
|
||||
if connPool == nil {
|
||||
connPool = &pool.ConnPool{
|
||||
Server: false,
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
LogOutput: config.LogOutput,
|
||||
MaxTime: clientRPCConnMaxIdle,
|
||||
MaxStreams: clientMaxStreams,
|
||||
TLSConfigurator: tlsConfigurator,
|
||||
Datacenter: config.Datacenter,
|
||||
}
|
||||
}
|
||||
|
||||
// Create client
|
||||
|
@ -202,6 +210,10 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
|
|||
return c, nil
|
||||
}
|
||||
|
||||
func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurator *tlsutil.Configurator) (*Client, error) {
|
||||
return NewClientWithOptions(config, WithLogger(logger), WithTLSConfigurator(tlsConfigurator))
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the client
|
||||
func (c *Client) Shutdown() error {
|
||||
c.logger.Info("shutting down client")
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
type consulOptions struct {
|
||||
logger hclog.InterceptLogger
|
||||
tlsConfigurator *tlsutil.Configurator
|
||||
connPool *pool.ConnPool
|
||||
tokens *token.Store
|
||||
}
|
||||
|
||||
type ConsulOption func(*consulOptions)
|
||||
|
||||
func WithLogger(logger hclog.InterceptLogger) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.tlsConfigurator = tlsConfigurator
|
||||
}
|
||||
}
|
||||
|
||||
func WithConnectionPool(connPool *pool.ConnPool) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.connPool = connPool
|
||||
}
|
||||
}
|
||||
|
||||
func WithTokenStore(tokens *token.Store) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.tokens = tokens
|
||||
}
|
||||
}
|
||||
|
||||
func flattenConsulOptions(options []ConsulOption) consulOptions {
|
||||
var flat consulOptions
|
||||
for _, opt := range options {
|
||||
opt(&flat)
|
||||
}
|
||||
return flat
|
||||
}
|
|
@ -322,6 +322,22 @@ func NewServer(config *Config) (*Server, error) {
|
|||
// NewServerLogger is used to construct a new Consul server from the
|
||||
// configuration, potentially returning an error
|
||||
func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token.Store, tlsConfigurator *tlsutil.Configurator) (*Server, error) {
|
||||
return NewServerWithOptions(config,
|
||||
WithLogger(logger),
|
||||
WithTokenStore(tokens),
|
||||
WithTLSConfigurator(tlsConfigurator))
|
||||
}
|
||||
|
||||
// NewServerWithOptions is used to construct a new Consul server from the configuration
|
||||
// and extra options, potentially returning an error
|
||||
func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, error) {
|
||||
flat := flattenConsulOptions(options)
|
||||
|
||||
logger := flat.logger
|
||||
tokens := flat.tokens
|
||||
tlsConfigurator := flat.tlsConfigurator
|
||||
connPool := flat.connPool
|
||||
|
||||
// Check the protocol version.
|
||||
if err := config.CheckProtocolVersion(); err != nil {
|
||||
return nil, err
|
||||
|
@ -376,14 +392,16 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
|
|||
// Create the shutdown channel - this is closed but never written to.
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
connPool := &pool.ConnPool{
|
||||
Server: true,
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
LogOutput: config.LogOutput,
|
||||
MaxTime: serverRPCCache,
|
||||
MaxStreams: serverMaxStreams,
|
||||
TLSConfigurator: tlsConfigurator,
|
||||
Datacenter: config.Datacenter,
|
||||
if connPool == nil {
|
||||
connPool = &pool.ConnPool{
|
||||
Server: true,
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
LogOutput: config.LogOutput,
|
||||
MaxTime: serverRPCCache,
|
||||
MaxStreams: serverMaxStreams,
|
||||
TLSConfigurator: tlsConfigurator,
|
||||
Datacenter: config.Datacenter,
|
||||
}
|
||||
}
|
||||
|
||||
serverLogger := logger.NamedIntercept(logging.ConsulServer)
|
||||
|
|
|
@ -4051,7 +4051,7 @@ func TestDNS_ServiceLookup_OnlyPassing(t *testing.T) {
|
|||
|
||||
newCfg := *a.Config
|
||||
newCfg.DNSOnlyPassing = false
|
||||
err := a.ReloadConfig(&newCfg)
|
||||
err := a.reloadConfigInternal(&newCfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// only_passing is now false. we should now get two nodes
|
||||
|
@ -6996,7 +6996,7 @@ func TestDNS_ConfigReload(t *testing.T) {
|
|||
newCfg.DNSSOA.Expire = 30
|
||||
newCfg.DNSSOA.Minttl = 40
|
||||
|
||||
err := a.ReloadConfig(&newCfg)
|
||||
err := a.reloadConfigInternal(&newCfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, s := range a.dnsServers {
|
||||
|
@ -7077,7 +7077,7 @@ func TestDNS_ReloadConfig_DuringQuery(t *testing.T) {
|
|||
// reload the config halfway through, that should not affect the ongoing query
|
||||
newCfg := *a.Config
|
||||
newCfg.DNSAllowStale = true
|
||||
a.ReloadConfig(&newCfg)
|
||||
a.reloadConfigInternal(&newCfg)
|
||||
|
||||
select {
|
||||
case in := <-res:
|
||||
|
|
|
@ -1447,7 +1447,7 @@ func TestRPC_HTTPSMaxConnsPerClient(t *testing.T) {
|
|||
// Reload config with higher limit
|
||||
newCfg := *a.config
|
||||
newCfg.HTTPMaxConnsPerClient = 10
|
||||
require.NoError(t, a.ReloadConfig(&newCfg))
|
||||
require.NoError(t, a.reloadConfigInternal(&newCfg))
|
||||
|
||||
// Now another conn should be allowed
|
||||
conn4, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
|
|
|
@ -1914,7 +1914,7 @@ func TestAgent_AliasCheck(t *testing.T) {
|
|||
|
||||
func TestAgent_sendCoordinate(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := agent.NewTestAgent(t, `
|
||||
a := agent.StartTestAgent(t, agent.TestAgent{Overrides: `
|
||||
sync_coordinate_interval_min = "1ms"
|
||||
sync_coordinate_rate_target = 10.0
|
||||
consul = {
|
||||
|
@ -1924,7 +1924,7 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
update_max_batches = 1
|
||||
}
|
||||
}
|
||||
`)
|
||||
`})
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
|
|
|
@ -466,7 +466,11 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr) (*Conn,
|
|||
conf.LogOutput = p.LogOutput
|
||||
|
||||
// Create a multiplexed session
|
||||
session, _ := yamux.Client(conn, conf)
|
||||
session, err := yamux.Client(conn, conf)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("Failed to create yamux client: %w", err)
|
||||
}
|
||||
|
||||
// Wrap the connection
|
||||
c := &Conn{
|
||||
|
@ -552,7 +556,11 @@ func (p *ConnPool) RPC(
|
|||
return fmt.Errorf("pool: ConnPool.RPC requires a node name")
|
||||
}
|
||||
|
||||
if method == "AutoEncrypt.Sign" {
|
||||
// TODO (autoconf) probably will want to have a way to invoke the
|
||||
// secure or insecure variant depending on whether its an ongoing
|
||||
// or first time config request. For now though this is fine until
|
||||
// those ongoing requests are implemented.
|
||||
if method == "AutoEncrypt.Sign" || method == "Cluster.AutoConfig" {
|
||||
return p.rpcInsecure(dc, nodeName, addr, method, args, reply)
|
||||
} else {
|
||||
return p.rpc(dc, nodeName, addr, method, args, reply)
|
||||
|
|
|
@ -2,6 +2,9 @@ package agent
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -20,6 +23,7 @@ import (
|
|||
"github.com/hashicorp/go-hclog"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
|
@ -27,6 +31,7 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -81,6 +86,10 @@ type TestAgent struct {
|
|||
// It is valid after Start().
|
||||
srv *HTTPServer
|
||||
|
||||
// overrides is an hcl config source to use to override otherwise
|
||||
// non-user settable configurations
|
||||
Overrides string
|
||||
|
||||
// Agent is the embedded consul agent.
|
||||
// It is valid after Start().
|
||||
*Agent
|
||||
|
@ -100,6 +109,7 @@ func NewTestAgent(t *testing.T, hcl string) *TestAgent {
|
|||
// The caller is responsible for calling Shutdown() to stop the agent and remove
|
||||
// temporary directories.
|
||||
func StartTestAgent(t *testing.T, a TestAgent) *TestAgent {
|
||||
t.Helper()
|
||||
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
|
||||
if err := a.Start(t); err != nil {
|
||||
r.Fatal(err)
|
||||
|
@ -109,9 +119,31 @@ func StartTestAgent(t *testing.T, a TestAgent) *TestAgent {
|
|||
return &a
|
||||
}
|
||||
|
||||
func TestConfigHCL(nodeID string) string {
|
||||
return fmt.Sprintf(`
|
||||
bind_addr = "127.0.0.1"
|
||||
advertise_addr = "127.0.0.1"
|
||||
datacenter = "dc1"
|
||||
bootstrap = true
|
||||
server = true
|
||||
node_id = "%[1]s"
|
||||
node_name = "Node-%[1]s"
|
||||
connect {
|
||||
enabled = true
|
||||
ca_config {
|
||||
cluster_id = "%[2]s"
|
||||
}
|
||||
}
|
||||
performance {
|
||||
raft_multiplier = 1
|
||||
}`, nodeID, connect.TestClusterID,
|
||||
)
|
||||
}
|
||||
|
||||
// Start starts a test agent. It returns an error if the agent could not be started.
|
||||
// If no error is returned, the caller must call Shutdown() when finished.
|
||||
func (a *TestAgent) Start(t *testing.T) (err error) {
|
||||
t.Helper()
|
||||
if a.Agent != nil {
|
||||
return fmt.Errorf("TestAgent already started")
|
||||
}
|
||||
|
@ -148,6 +180,7 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
|||
// parsing.
|
||||
d = filepath.ToSlash(d)
|
||||
hclDataDir = `data_dir = "` + d + `"`
|
||||
a.DataDir = d
|
||||
}
|
||||
|
||||
logOutput := a.LogOutput
|
||||
|
@ -164,11 +197,27 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
|||
|
||||
portsConfig, returnPortsFn := randomPortsSource(a.UseTLS)
|
||||
a.returnPortsFn = returnPortsFn
|
||||
a.Config = TestConfig(logger,
|
||||
portsConfig,
|
||||
config.Source{Name: name, Format: "hcl", Data: a.HCL},
|
||||
config.Source{Name: name + ".data_dir", Format: "hcl", Data: hclDataDir},
|
||||
)
|
||||
|
||||
nodeID := NodeID()
|
||||
|
||||
opts := []AgentOption{
|
||||
WithLogger(logger),
|
||||
WithBuilderOpts(config.BuilderOpts{
|
||||
HCL: []string{
|
||||
TestConfigHCL(nodeID),
|
||||
portsConfig,
|
||||
a.HCL,
|
||||
hclDataDir,
|
||||
},
|
||||
}),
|
||||
WithOverrides(config.Source{
|
||||
Name: "test-overrides",
|
||||
Format: "hcl",
|
||||
Data: a.Overrides},
|
||||
config.DefaultConsulSource(),
|
||||
config.DevConsulSource(),
|
||||
),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil && a.returnPortsFn != nil {
|
||||
|
@ -180,7 +229,7 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
|||
// write the keyring
|
||||
if a.Key != "" {
|
||||
writeKey := func(key, filename string) error {
|
||||
path := filepath.Join(a.Config.DataDir, filename)
|
||||
path := filepath.Join(a.DataDir, filename)
|
||||
if err := initKeyring(path, key); err != nil {
|
||||
cleanupTmpDir()
|
||||
return fmt.Errorf("Error creating keyring %s: %s", path, err)
|
||||
|
@ -197,13 +246,14 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
agent, err := New(a.Config, logger)
|
||||
agent, err := New(opts...)
|
||||
if err != nil {
|
||||
cleanupTmpDir()
|
||||
return fmt.Errorf("Error creating agent: %s", err)
|
||||
}
|
||||
|
||||
agent.LogOutput = logOutput
|
||||
a.Config = agent.GetConfig()
|
||||
|
||||
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
|
||||
id := string(a.Config.NodeID)
|
||||
|
@ -224,6 +274,7 @@ func (a *TestAgent) Start(t *testing.T) (err error) {
|
|||
if err := a.waitForUp(); err != nil {
|
||||
cleanupTmpDir()
|
||||
a.Shutdown()
|
||||
t.Logf("Error while waiting for test agent to start: %v", err)
|
||||
return errwrap.Wrapf(name+": {{err}}", err)
|
||||
}
|
||||
|
||||
|
@ -271,7 +322,11 @@ func (a *TestAgent) waitForUp() error {
|
|||
req := httptest.NewRequest("GET", "/v1/agent/self", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
_, err := a.httpServers[0].AgentSelf(resp, req)
|
||||
if err != nil || resp.Code != 200 {
|
||||
if acl.IsErrPermissionDenied(err) || resp.Code == 403 {
|
||||
// permission denied is enough to show that the client is
|
||||
// connected to the servers as it would get a 503 if
|
||||
// it couldn't connect to them.
|
||||
} else if err != nil && resp.Code != 200 {
|
||||
retErr = fmt.Errorf("failed OK response: %v", err)
|
||||
continue
|
||||
}
|
||||
|
@ -372,7 +427,7 @@ func (a *TestAgent) consulConfig() *consul.Config {
|
|||
// chance of port conflicts for concurrently executed test binaries.
|
||||
// Instead of relying on one set of ports to be sufficient we retry
|
||||
// starting the agent with different ports on port conflict.
|
||||
func randomPortsSource(tls bool) (src config.Source, returnPortsFn func()) {
|
||||
func randomPortsSource(tls bool) (data string, returnPortsFn func()) {
|
||||
ports := freeport.MustTake(7)
|
||||
|
||||
var http, https int
|
||||
|
@ -384,21 +439,17 @@ func randomPortsSource(tls bool) (src config.Source, returnPortsFn func()) {
|
|||
https = -1
|
||||
}
|
||||
|
||||
return config.Source{
|
||||
Name: "ports",
|
||||
Format: "hcl",
|
||||
Data: `
|
||||
ports = {
|
||||
dns = ` + strconv.Itoa(ports[0]) + `
|
||||
http = ` + strconv.Itoa(http) + `
|
||||
https = ` + strconv.Itoa(https) + `
|
||||
serf_lan = ` + strconv.Itoa(ports[3]) + `
|
||||
serf_wan = ` + strconv.Itoa(ports[4]) + `
|
||||
server = ` + strconv.Itoa(ports[5]) + `
|
||||
grpc = ` + strconv.Itoa(ports[6]) + `
|
||||
}
|
||||
`,
|
||||
}, func() { freeport.Return(ports) }
|
||||
return `
|
||||
ports = {
|
||||
dns = ` + strconv.Itoa(ports[0]) + `
|
||||
http = ` + strconv.Itoa(http) + `
|
||||
https = ` + strconv.Itoa(https) + `
|
||||
serf_lan = ` + strconv.Itoa(ports[3]) + `
|
||||
serf_wan = ` + strconv.Itoa(ports[4]) + `
|
||||
server = ` + strconv.Itoa(ports[5]) + `
|
||||
grpc = ` + strconv.Itoa(ports[6]) + `
|
||||
}
|
||||
`, func() { freeport.Return(ports) }
|
||||
}
|
||||
|
||||
func NodeID() string {
|
||||
|
@ -518,34 +569,34 @@ func TestACLConfigNew() string {
|
|||
}
|
||||
|
||||
var aclConfigTpl = template.Must(template.New("ACL Config").Parse(`
|
||||
{{if ne .PrimaryDatacenter ""}}
|
||||
{{- if ne .PrimaryDatacenter "" -}}
|
||||
primary_datacenter = "{{ .PrimaryDatacenter }}"
|
||||
{{end}}
|
||||
{{end -}}
|
||||
acl {
|
||||
enabled = true
|
||||
{{if ne .DefaultPolicy ""}}
|
||||
{{- if ne .DefaultPolicy ""}}
|
||||
default_policy = "{{ .DefaultPolicy }}"
|
||||
{{end}}
|
||||
{{- end}}
|
||||
enable_token_replication = {{printf "%t" .EnableTokenReplication }}
|
||||
{{if .HasConfiguredTokens }}
|
||||
{{- if .HasConfiguredTokens}}
|
||||
tokens {
|
||||
{{if ne .MasterToken ""}}
|
||||
{{- if ne .MasterToken ""}}
|
||||
master = "{{ .MasterToken }}"
|
||||
{{end}}
|
||||
{{if ne .AgentToken ""}}
|
||||
{{- end}}
|
||||
{{- if ne .AgentToken ""}}
|
||||
agent = "{{ .AgentToken }}"
|
||||
{{end}}
|
||||
{{if ne .AgentMasterToken "" }}
|
||||
{{- end}}
|
||||
{{- if ne .AgentMasterToken "" }}
|
||||
agent_master = "{{ .AgentMasterToken }}"
|
||||
{{end}}
|
||||
{{if ne .DefaultToken "" }}
|
||||
{{- end}}
|
||||
{{- if ne .DefaultToken "" }}
|
||||
default = "{{ .DefaultToken }}"
|
||||
{{end}}
|
||||
{{if ne .ReplicationToken "" }}
|
||||
{{- end}}
|
||||
{{- if ne .ReplicationToken "" }}
|
||||
replication = "{{ .ReplicationToken }}"
|
||||
{{end}}
|
||||
{{- end}}
|
||||
}
|
||||
{{end}}
|
||||
{{- end}}
|
||||
}
|
||||
`))
|
||||
|
||||
|
@ -564,3 +615,43 @@ func TestACLConfigWithParams(params *TestACLConfigParams) string {
|
|||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// testTLSCertificates Generates a TLS CA and server key/cert and returns them
|
||||
// in PEM encoded form.
|
||||
func testTLSCertificates(serverName string) (cert string, key string, cacert string, err error) {
|
||||
// generate CA
|
||||
serial, err := tlsutil.GenerateSerialNumber()
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
signer, err := ecdsa.GenerateKey(elliptic.P256(), rand.New(rand.NewSource(99)))
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
ca, err := tlsutil.GenerateCA(signer, serial, 365, nil)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
// generate leaf
|
||||
serial, err = tlsutil.GenerateSerialNumber()
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
cert, privateKey, err := tlsutil.GenerateCert(
|
||||
signer,
|
||||
ca,
|
||||
serial,
|
||||
"Test Cert Name",
|
||||
365,
|
||||
[]string{serverName},
|
||||
nil,
|
||||
[]x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
|
||||
)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
return cert, privateKey, ca, nil
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func TestAuthMethodCreateCommand(t *testing.T) {
|
|||
}`)
|
||||
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WithToken("root"))
|
||||
client := a.Client()
|
||||
|
||||
t.Run("type required", func(t *testing.T) {
|
||||
|
@ -201,7 +201,7 @@ func TestAuthMethodCreateCommand_JSON(t *testing.T) {
|
|||
}`)
|
||||
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WithToken("root"))
|
||||
client := a.Client()
|
||||
|
||||
t.Run("type required", func(t *testing.T) {
|
||||
|
@ -369,7 +369,7 @@ func TestAuthMethodCreateCommand_k8s(t *testing.T) {
|
|||
}`)
|
||||
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WithToken("root"))
|
||||
client := a.Client()
|
||||
|
||||
t.Run("k8s host required", func(t *testing.T) {
|
||||
|
@ -512,7 +512,7 @@ func TestAuthMethodCreateCommand_config(t *testing.T) {
|
|||
}`)
|
||||
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WithToken("root"))
|
||||
client := a.Client()
|
||||
|
||||
checkMethod := func(t *testing.T, methodName string) {
|
||||
|
|
|
@ -40,7 +40,7 @@ func TestPolicyReadCommand(t *testing.T) {
|
|||
}`)
|
||||
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WithToken("root"))
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
cmd := New(ui)
|
||||
|
@ -86,7 +86,7 @@ func TestPolicyReadCommand_JSON(t *testing.T) {
|
|||
}`)
|
||||
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WithToken("root"))
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
cmd := New(ui)
|
||||
|
|
|
@ -18,9 +18,7 @@ import (
|
|||
"github.com/hashicorp/consul/service_os"
|
||||
"github.com/hashicorp/go-checkpoint"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/mitchellh/cli"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
func New(ui cli.Ui, revision, version, versionPre, versionHuman string, shutdownCh <-chan struct{}) *cmd {
|
||||
|
@ -80,25 +78,6 @@ func (c *cmd) Run(args []string) int {
|
|||
return code
|
||||
}
|
||||
|
||||
// readConfig is responsible for setup of our configuration using
|
||||
// the command line and any file configs
|
||||
func (c *cmd) readConfig() *config.RuntimeConfig {
|
||||
b, err := config.NewBuilder(c.flagArgs)
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return nil
|
||||
}
|
||||
cfg, err := b.BuildAndValidate()
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return nil
|
||||
}
|
||||
for _, w := range b.Warnings {
|
||||
c.UI.Warn(w)
|
||||
}
|
||||
return &cfg
|
||||
}
|
||||
|
||||
// checkpointResults is used to handler periodic results from our update checker
|
||||
func (c *cmd) checkpointResults(results *checkpoint.CheckResponse, err error) {
|
||||
if err != nil {
|
||||
|
@ -185,29 +164,22 @@ func (c *cmd) run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
config := c.readConfig()
|
||||
if config == nil {
|
||||
logGate := logging.GatedWriter{Writer: &cli.UiWriter{Ui: c.UI}}
|
||||
|
||||
agentOptions := []agent.AgentOption{
|
||||
agent.WithBuilderOpts(c.flagArgs),
|
||||
agent.WithCLI(c.UI),
|
||||
agent.WithLogWriter(&logGate),
|
||||
}
|
||||
|
||||
agent, err := agent.New(agentOptions...)
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Setup the log outputs
|
||||
logConfig := &logging.Config{
|
||||
LogLevel: config.LogLevel,
|
||||
LogJSON: config.LogJSON,
|
||||
Name: logging.Agent,
|
||||
EnableSyslog: config.EnableSyslog,
|
||||
SyslogFacility: config.SyslogFacility,
|
||||
LogFilePath: config.LogFile,
|
||||
LogRotateDuration: config.LogRotateDuration,
|
||||
LogRotateBytes: config.LogRotateBytes,
|
||||
LogRotateMaxFiles: config.LogRotateMaxFiles,
|
||||
}
|
||||
logger, logGate, logOutput, ok := logging.Setup(logConfig, c.UI)
|
||||
if !ok {
|
||||
return 1
|
||||
}
|
||||
|
||||
c.logger = logger
|
||||
config := agent.GetConfig()
|
||||
c.logger = agent.GetLogger()
|
||||
|
||||
//Setup gate to check if we should output CLI information
|
||||
cli := GatedUi{
|
||||
|
@ -215,26 +187,8 @@ func (c *cmd) run(args []string) int {
|
|||
ui: c.UI,
|
||||
}
|
||||
|
||||
// Setup gRPC logger to use the same output/filtering
|
||||
grpclog.SetLoggerV2(logging.NewGRPCLogger(logConfig, c.logger))
|
||||
|
||||
memSink, err := lib.InitTelemetry(config.Telemetry)
|
||||
if err != nil {
|
||||
c.logger.Error(err.Error())
|
||||
logGate.Flush()
|
||||
return 1
|
||||
}
|
||||
|
||||
// Create the agent
|
||||
cli.output("Starting Consul agent...")
|
||||
agent, err := agent.New(config, c.logger)
|
||||
if err != nil {
|
||||
c.logger.Error("Error creating agent", "error", err)
|
||||
logGate.Flush()
|
||||
return 1
|
||||
}
|
||||
agent.LogOutput = logOutput
|
||||
agent.MemSink = memSink
|
||||
|
||||
segment := config.SegmentName
|
||||
if config.ServerMode {
|
||||
|
@ -327,13 +281,9 @@ func (c *cmd) run(args []string) int {
|
|||
|
||||
for {
|
||||
var sig os.Signal
|
||||
var reloadErrCh chan error
|
||||
select {
|
||||
case s := <-signalCh:
|
||||
sig = s
|
||||
case ch := <-agent.ReloadCh():
|
||||
sig = syscall.SIGHUP
|
||||
reloadErrCh = ch
|
||||
case <-service_os.Shutdown_Channel():
|
||||
sig = os.Interrupt
|
||||
case <-c.shutdownCh:
|
||||
|
@ -353,18 +303,11 @@ func (c *cmd) run(args []string) int {
|
|||
case syscall.SIGHUP:
|
||||
c.logger.Info("Caught", "signal", sig)
|
||||
|
||||
conf, err := c.handleReload(agent, config)
|
||||
if conf != nil {
|
||||
config = conf
|
||||
}
|
||||
err := agent.ReloadConfig()
|
||||
if err != nil {
|
||||
c.logger.Error("Reload config failed", "error", err)
|
||||
}
|
||||
// Send result back if reload was called via HTTP
|
||||
if reloadErrCh != nil {
|
||||
reloadErrCh <- err
|
||||
}
|
||||
|
||||
config = agent.GetConfig()
|
||||
default:
|
||||
c.logger.Info("Caught", "signal", sig)
|
||||
|
||||
|
@ -400,37 +343,6 @@ func (c *cmd) run(args []string) int {
|
|||
}
|
||||
}
|
||||
|
||||
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
|
||||
func (c *cmd) handleReload(agent *agent.Agent, cfg *config.RuntimeConfig) (*config.RuntimeConfig, error) {
|
||||
c.logger.Info("Reloading configuration...")
|
||||
var errs error
|
||||
newCfg := c.readConfig()
|
||||
if newCfg == nil {
|
||||
errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs"))
|
||||
return cfg, errs
|
||||
}
|
||||
|
||||
// Change the log level
|
||||
if logging.ValidateLogLevel(newCfg.LogLevel) {
|
||||
c.logger.SetLevel(logging.LevelFromString(newCfg.LogLevel))
|
||||
} else {
|
||||
errs = multierror.Append(fmt.Errorf(
|
||||
"Invalid log level: %s. Valid log levels are: %v",
|
||||
newCfg.LogLevel, logging.AllowedLogLevels()))
|
||||
|
||||
// Keep the current log level
|
||||
newCfg.LogLevel = cfg.LogLevel
|
||||
|
||||
}
|
||||
|
||||
if err := agent.ReloadConfig(newCfg); err != nil {
|
||||
errs = multierror.Append(fmt.Errorf(
|
||||
"Failed to reload configs: %v", err))
|
||||
}
|
||||
|
||||
return newCfg, errs
|
||||
}
|
||||
|
||||
func (g *GatedUi) output(s string) {
|
||||
if !g.JSONoutput {
|
||||
g.ui.Output(s)
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/hashicorp/consul/testrpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/mitchellh/cli"
|
||||
|
@ -205,67 +204,3 @@ func TestBadDataDirPermissions(t *testing.T) {
|
|||
t.Fatalf("expected permission denied error, got: %s", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReloadLoggerFail(t *testing.T) {
|
||||
a := agent.NewTestAgent(t, "")
|
||||
defer a.Shutdown()
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
cmd := New(ui, "", "", "", "", nil)
|
||||
|
||||
bindAddr := a.Config.BindAddr.String()
|
||||
cmd.flagArgs.Config.BindAddr = &bindAddr
|
||||
cmd.flagArgs.Config.DataDir = &a.Config.DataDir
|
||||
|
||||
cmd.logger = testutil.Logger(t)
|
||||
|
||||
newLogLevel := "BLAH"
|
||||
cmd.flagArgs.Config.LogLevel = &newLogLevel
|
||||
|
||||
oldCfg := config.RuntimeConfig{
|
||||
LogLevel: "INFO",
|
||||
}
|
||||
cfg, err := cmd.handleReload(a.Agent, &oldCfg)
|
||||
if err == nil {
|
||||
t.Fatal("Should fail with bad log level")
|
||||
}
|
||||
|
||||
if !strings.Contains(err.Error(), "Invalid log level") {
|
||||
t.Fatalf("expected invalid log level error, got: %s", err)
|
||||
}
|
||||
if cfg.LogLevel != "INFO" {
|
||||
t.Fatalf("expected log level to stay the same, got: %s", cfg.LogLevel)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReloadLoggerSuccess(t *testing.T) {
|
||||
a := agent.NewTestAgent(t, "")
|
||||
defer a.Shutdown()
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
cmd := New(ui, "", "", "", "", nil)
|
||||
|
||||
bindAddr := a.Config.BindAddr.String()
|
||||
cmd.flagArgs.Config.BindAddr = &bindAddr
|
||||
cmd.flagArgs.Config.DataDir = &a.Config.DataDir
|
||||
|
||||
cmd.logger = testutil.Logger(t)
|
||||
|
||||
newLogLevel := "ERROR"
|
||||
cmd.flagArgs.Config.LogLevel = &newLogLevel
|
||||
|
||||
oldCfg := config.RuntimeConfig{
|
||||
LogLevel: "INFO",
|
||||
}
|
||||
cfg, err := cmd.handleReload(a.Agent, &oldCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if cfg.LogLevel != "ERROR" {
|
||||
t.Fatalf("expected log level to change to 'ERROR', got: %s", cfg.LogLevel)
|
||||
}
|
||||
if cmd.logger.IsWarn() || !cmd.logger.IsError() {
|
||||
t.Fatal("expected logger level to change to 'ERROR'")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package proxy
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -136,8 +137,12 @@ func (c *cmd) Run(args []string) int {
|
|||
Name: logging.Proxy,
|
||||
LogJSON: c.logJSON,
|
||||
}
|
||||
logger, logGate, _, ok := logging.Setup(logConfig, c.UI)
|
||||
if !ok {
|
||||
|
||||
logGate := logging.GatedWriter{Writer: &cli.UiWriter{Ui: c.UI}}
|
||||
|
||||
logger, _, err := logging.Setup(logConfig, []io.Writer{&logGate})
|
||||
if err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
c.logger = logger
|
||||
|
|
|
@ -21,11 +21,6 @@ func TestReloadCommand(t *testing.T) {
|
|||
defer a.Shutdown()
|
||||
|
||||
// Setup a dummy response to errCh to simulate a successful reload
|
||||
go func() {
|
||||
errCh := <-a.ReloadCh()
|
||||
errCh <- nil
|
||||
}()
|
||||
|
||||
ui := cli.NewMockUi()
|
||||
c := New(ui)
|
||||
args := []string{"-http-addr=" + a.HTTPAddr()}
|
||||
|
|
|
@ -127,7 +127,11 @@ func TestService_Dial(t *testing.T) {
|
|||
func TestService_ServerTLSConfig(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
a := agent.StartTestAgent(t, agent.TestAgent{Name: "007"})
|
||||
a := agent.StartTestAgent(t, agent.TestAgent{Name: "007", Overrides: `
|
||||
connect {
|
||||
test_ca_leaf_root_change_spread = "1ns"
|
||||
}
|
||||
`})
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
client := a.Client()
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
gsyslog "github.com/hashicorp/go-syslog"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
// Config is used to set up logging.
|
||||
|
@ -51,6 +50,8 @@ var (
|
|||
logRotateBytes int
|
||||
)
|
||||
|
||||
type LogSetupErrorFn func(string)
|
||||
|
||||
// Setup is used to perform setup of several logging objects:
|
||||
//
|
||||
// * A hclog.Logger is used to perform filtering by log level and write to io.Writer.
|
||||
|
@ -62,17 +63,11 @@ var (
|
|||
// The provided ui object will get any log messages related to setting up
|
||||
// logging itself, and will also be hooked up to the gated logger. The final bool
|
||||
// parameter indicates if logging was set up successfully.
|
||||
func Setup(config *Config, ui cli.Ui) (hclog.InterceptLogger, *GatedWriter, io.Writer, bool) {
|
||||
// The gated writer buffers logs at startup and holds until it's flushed.
|
||||
logGate := &GatedWriter{
|
||||
Writer: &cli.UiWriter{Ui: ui},
|
||||
}
|
||||
|
||||
func Setup(config *Config, writers []io.Writer) (hclog.InterceptLogger, io.Writer, error) {
|
||||
if !ValidateLogLevel(config.LogLevel) {
|
||||
ui.Error(fmt.Sprintf(
|
||||
"Invalid log level: %s. Valid log levels are: %v",
|
||||
config.LogLevel, allowedLogLevels))
|
||||
return nil, nil, nil, false
|
||||
return nil, nil, fmt.Errorf("Invalid log level: %s. Valid log levels are: %v",
|
||||
config.LogLevel,
|
||||
allowedLogLevels)
|
||||
}
|
||||
|
||||
// Set up syslog if it's enabled.
|
||||
|
@ -87,20 +82,15 @@ func Setup(config *Config, ui cli.Ui) (hclog.InterceptLogger, *GatedWriter, io.W
|
|||
break
|
||||
}
|
||||
|
||||
ui.Error(fmt.Sprintf("Syslog setup error: %v", err))
|
||||
if i == retries {
|
||||
timeout := time.Duration(retries) * delay
|
||||
ui.Error(fmt.Sprintf("Syslog setup did not succeed within timeout (%s).", timeout.String()))
|
||||
return nil, nil, nil, false
|
||||
return nil, nil, fmt.Errorf("Syslog setup did not succeed within timeout (%s).", timeout.String())
|
||||
}
|
||||
|
||||
ui.Error(fmt.Sprintf("Retrying syslog setup in %s...", delay.String()))
|
||||
time.Sleep(delay)
|
||||
}
|
||||
}
|
||||
writers := []io.Writer{logGate}
|
||||
|
||||
var logOutput io.Writer
|
||||
if syslog != nil {
|
||||
writers = append(writers, syslog)
|
||||
}
|
||||
|
@ -131,13 +121,12 @@ func Setup(config *Config, ui cli.Ui) (hclog.InterceptLogger, *GatedWriter, io.W
|
|||
MaxFiles: config.LogRotateMaxFiles,
|
||||
}
|
||||
if err := logFile.openNew(); err != nil {
|
||||
ui.Error(fmt.Sprintf("Failed to setup logging: %v", err))
|
||||
return nil, nil, nil, false
|
||||
return nil, nil, fmt.Errorf("Failed to setup logging: %w", err)
|
||||
}
|
||||
writers = append(writers, logFile)
|
||||
}
|
||||
|
||||
logOutput = io.MultiWriter(writers...)
|
||||
logOutput := io.MultiWriter(writers...)
|
||||
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Level: LevelFromString(config.LogLevel),
|
||||
|
@ -146,5 +135,5 @@ func Setup(config *Config, ui cli.Ui) (hclog.InterceptLogger, *GatedWriter, io.W
|
|||
JSONFormat: config.LogJSON,
|
||||
})
|
||||
|
||||
return logger, logGate, logOutput, true
|
||||
return logger, logOutput, nil
|
||||
}
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
package logging
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -16,24 +18,19 @@ func TestLogger_SetupBasic(t *testing.T) {
|
|||
cfg := &Config{
|
||||
LogLevel: "INFO",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
|
||||
logger, gatedWriter, writer, ok := Setup(cfg, ui)
|
||||
require.True(ok)
|
||||
require.NotNil(gatedWriter)
|
||||
logger, writer, err := Setup(cfg, nil)
|
||||
require.NoError(err)
|
||||
require.NotNil(writer)
|
||||
require.NotNil(logger)
|
||||
}
|
||||
|
||||
func TestLogger_SetupInvalidLogLevel(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
cfg := &Config{}
|
||||
ui := cli.NewMockUi()
|
||||
|
||||
_, _, _, ok := Setup(cfg, ui)
|
||||
require.False(ok)
|
||||
require.Contains(ui.ErrorWriter.String(), "Invalid log level")
|
||||
_, _, err := Setup(cfg, nil)
|
||||
testutil.RequireErrorContains(t, err, "Invalid log level")
|
||||
}
|
||||
|
||||
func TestLogger_SetupLoggerErrorLevel(t *testing.T) {
|
||||
|
@ -63,20 +60,19 @@ func TestLogger_SetupLoggerErrorLevel(t *testing.T) {
|
|||
|
||||
c.before(&cfg)
|
||||
require := require.New(t)
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, _, ok := Setup(&cfg, ui)
|
||||
require.True(ok)
|
||||
logger, _, err := Setup(&cfg, []io.Writer{&buf})
|
||||
require.NoError(err)
|
||||
require.NotNil(logger)
|
||||
require.NotNil(gatedWriter)
|
||||
|
||||
gatedWriter.Flush()
|
||||
|
||||
logger.Error("test error msg")
|
||||
logger.Info("test info msg")
|
||||
|
||||
require.Contains(ui.OutputWriter.String(), "[ERROR] test error msg")
|
||||
require.NotContains(ui.OutputWriter.String(), "[INFO] test info msg")
|
||||
output := buf.String()
|
||||
|
||||
require.Contains(output, "[ERROR] test error msg")
|
||||
require.NotContains(output, "[INFO] test info msg")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -87,20 +83,19 @@ func TestLogger_SetupLoggerDebugLevel(t *testing.T) {
|
|||
cfg := &Config{
|
||||
LogLevel: "DEBUG",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, _, ok := Setup(cfg, ui)
|
||||
require.True(ok)
|
||||
logger, _, err := Setup(cfg, []io.Writer{&buf})
|
||||
require.NoError(err)
|
||||
require.NotNil(logger)
|
||||
require.NotNil(gatedWriter)
|
||||
|
||||
gatedWriter.Flush()
|
||||
|
||||
logger.Info("test info msg")
|
||||
logger.Debug("test debug msg")
|
||||
|
||||
require.Contains(ui.OutputWriter.String(), "[INFO] test info msg")
|
||||
require.Contains(ui.OutputWriter.String(), "[DEBUG] test debug msg")
|
||||
output := buf.String()
|
||||
|
||||
require.Contains(output, "[INFO] test info msg")
|
||||
require.Contains(output, "[DEBUG] test debug msg")
|
||||
}
|
||||
|
||||
func TestLogger_SetupLoggerWithName(t *testing.T) {
|
||||
|
@ -110,18 +105,15 @@ func TestLogger_SetupLoggerWithName(t *testing.T) {
|
|||
LogLevel: "DEBUG",
|
||||
Name: "test-system",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, _, ok := Setup(cfg, ui)
|
||||
require.True(ok)
|
||||
logger, _, err := Setup(cfg, []io.Writer{&buf})
|
||||
require.NoError(err)
|
||||
require.NotNil(logger)
|
||||
require.NotNil(gatedWriter)
|
||||
|
||||
gatedWriter.Flush()
|
||||
|
||||
logger.Warn("test warn msg")
|
||||
|
||||
require.Contains(ui.OutputWriter.String(), "[WARN] test-system: test warn msg")
|
||||
require.Contains(buf.String(), "[WARN] test-system: test warn msg")
|
||||
}
|
||||
|
||||
func TestLogger_SetupLoggerWithJSON(t *testing.T) {
|
||||
|
@ -132,19 +124,16 @@ func TestLogger_SetupLoggerWithJSON(t *testing.T) {
|
|||
LogJSON: true,
|
||||
Name: "test-system",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, _, ok := Setup(cfg, ui)
|
||||
require.True(ok)
|
||||
logger, _, err := Setup(cfg, []io.Writer{&buf})
|
||||
require.NoError(err)
|
||||
require.NotNil(logger)
|
||||
require.NotNil(gatedWriter)
|
||||
|
||||
gatedWriter.Flush()
|
||||
|
||||
logger.Warn("test warn msg")
|
||||
|
||||
var jsonOutput map[string]string
|
||||
err := json.Unmarshal(ui.OutputWriter.Bytes(), &jsonOutput)
|
||||
err = json.Unmarshal(buf.Bytes(), &jsonOutput)
|
||||
require.NoError(err)
|
||||
require.Contains(jsonOutput, "@level")
|
||||
require.Equal(jsonOutput["@level"], "warn")
|
||||
|
@ -163,13 +152,11 @@ func TestLogger_SetupLoggerWithValidLogPath(t *testing.T) {
|
|||
LogLevel: "INFO",
|
||||
LogFilePath: tmpDir + "/",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, writer, ok := Setup(cfg, ui)
|
||||
require.True(ok)
|
||||
logger, _, err := Setup(cfg, []io.Writer{&buf})
|
||||
require.NoError(err)
|
||||
require.NotNil(logger)
|
||||
require.NotNil(gatedWriter)
|
||||
require.NotNil(writer)
|
||||
}
|
||||
|
||||
func TestLogger_SetupLoggerWithInValidLogPath(t *testing.T) {
|
||||
|
@ -180,14 +167,12 @@ func TestLogger_SetupLoggerWithInValidLogPath(t *testing.T) {
|
|||
LogLevel: "INFO",
|
||||
LogFilePath: "nonexistentdir/",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, writer, ok := Setup(cfg, ui)
|
||||
require.Contains(ui.ErrorWriter.String(), "no such file or directory")
|
||||
require.False(ok)
|
||||
logger, _, err := Setup(cfg, []io.Writer{&buf})
|
||||
require.Error(err)
|
||||
require.True(errors.Is(err, os.ErrNotExist))
|
||||
require.Nil(logger)
|
||||
require.Nil(gatedWriter)
|
||||
require.Nil(writer)
|
||||
}
|
||||
|
||||
func TestLogger_SetupLoggerWithInValidLogPathPermission(t *testing.T) {
|
||||
|
@ -203,12 +188,10 @@ func TestLogger_SetupLoggerWithInValidLogPathPermission(t *testing.T) {
|
|||
LogLevel: "INFO",
|
||||
LogFilePath: tmpDir + "/",
|
||||
}
|
||||
ui := cli.NewMockUi()
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger, gatedWriter, writer, ok := Setup(cfg, ui)
|
||||
require.Contains(ui.ErrorWriter.String(), "permission denied")
|
||||
require.False(ok)
|
||||
logger, _, err := Setup(cfg, []io.Writer{&buf})
|
||||
require.Error(err)
|
||||
require.True(errors.Is(err, os.ErrPermission))
|
||||
require.Nil(logger)
|
||||
require.Nil(gatedWriter)
|
||||
require.Nil(writer)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ const (
|
|||
Agent string = "agent"
|
||||
AntiEntropy string = "anti_entropy"
|
||||
AutoEncrypt string = "auto_encrypt"
|
||||
AutoConfig string = "auto_config"
|
||||
Autopilot string = "autopilot"
|
||||
AWS string = "aws"
|
||||
Azure string = "azure"
|
||||
|
|
Loading…
Reference in New Issue