From d6e05482abcc8436669dc176d216348afab8c26f Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Fri, 19 Jun 2020 15:16:00 -0400 Subject: [PATCH] Allow cancelling startup when performing auto-config (#8157) Co-authored-by: Daniel Nephin --- agent/agent.go | 48 +++++++++++++++---------------- agent/auto-config/auto_config.go | 12 ++++++-- agent/config/builder.go | 13 +++++++-- agent/config/runtime_test.go | 4 +-- agent/consul/auto_encrypt.go | 11 +++---- agent/consul/auto_encrypt_test.go | 11 ++++--- agent/testagent.go | 3 +- command/agent/agent.go | 16 +++++------ 8 files changed, 68 insertions(+), 50 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 29b4172c74..d695d0ede3 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -257,8 +257,6 @@ type Agent struct { shutdownCh chan struct{} shutdownLock sync.Mutex - InterruptStartCh chan struct{} - // joinLANNotifier is called after a successful JoinLAN. joinLANNotifier notifier @@ -414,23 +412,22 @@ func New(options ...AgentOption) (*Agent, error) { // Create most of the agent a := Agent{ - checkReapAfter: make(map[structs.CheckID]time.Duration), - checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor), - checkTTLs: make(map[structs.CheckID]*checks.CheckTTL), - checkHTTPs: make(map[structs.CheckID]*checks.CheckHTTP), - checkTCPs: make(map[structs.CheckID]*checks.CheckTCP), - checkGRPCs: make(map[structs.CheckID]*checks.CheckGRPC), - checkDockers: make(map[structs.CheckID]*checks.CheckDocker), - checkAliases: make(map[structs.CheckID]*checks.CheckAlias), - eventCh: make(chan serf.UserEvent, 1024), - eventBuf: make([]*UserEvent, 256), - joinLANNotifier: &systemd.Notifier{}, - retryJoinCh: make(chan error), - shutdownCh: make(chan struct{}), - InterruptStartCh: make(chan struct{}), - endpoints: make(map[string]string), - tokens: new(token.Store), - logger: flat.logger, + checkReapAfter: make(map[structs.CheckID]time.Duration), + checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor), + checkTTLs: make(map[structs.CheckID]*checks.CheckTTL), + checkHTTPs: make(map[structs.CheckID]*checks.CheckHTTP), + checkTCPs: make(map[structs.CheckID]*checks.CheckTCP), + checkGRPCs: make(map[structs.CheckID]*checks.CheckGRPC), + checkDockers: make(map[structs.CheckID]*checks.CheckDocker), + checkAliases: make(map[structs.CheckID]*checks.CheckAlias), + eventCh: make(chan serf.UserEvent, 1024), + eventBuf: make([]*UserEvent, 256), + joinLANNotifier: &systemd.Notifier{}, + retryJoinCh: make(chan error), + shutdownCh: make(chan struct{}), + endpoints: make(map[string]string), + tokens: new(token.Store), + logger: flat.logger, } // parse the configuration and handle the error/warnings @@ -599,13 +596,13 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config { } // Start verifies its configuration and runs an agent's various subprocesses. -func (a *Agent) Start() error { +func (a *Agent) Start(ctx context.Context) error { a.stateLock.Lock() defer a.stateLock.Unlock() // This needs to be done early on as it will potentially alter the configuration // and then how other bits are brought up - c, err := a.autoConf.InitialConfiguration(&lib.StopChannelContext{StopCh: a.shutdownCh}) + c, err := a.autoConf.InitialConfiguration(ctx) if err != nil { return err } @@ -709,7 +706,7 @@ func (a *Agent) Start() error { a.registerCache() if a.config.AutoEncryptTLS && !a.config.ServerMode { - reply, err := a.setupClientAutoEncrypt() + reply, err := a.setupClientAutoEncrypt(ctx) if err != nil { return fmt.Errorf("AutoEncrypt failed: %s", err) } @@ -822,7 +819,7 @@ func (a *Agent) Start() error { return nil } -func (a *Agent) setupClientAutoEncrypt() (*structs.SignedResponse, error) { +func (a *Agent) setupClientAutoEncrypt(ctx context.Context) (*structs.SignedResponse, error) { client := a.delegate.(*consul.Client) addrs := a.config.StartJoinAddrsLAN @@ -832,7 +829,7 @@ func (a *Agent) setupClientAutoEncrypt() (*structs.SignedResponse, error) { } addrs = append(addrs, retryJoinAddrs(disco, retryJoinSerfVariant, "LAN", a.config.RetryJoinLAN, a.logger)...) - reply, priv, err := client.RequestAutoEncryptCerts(addrs, a.config.ServerPort, a.tokens.AgentToken(), a.InterruptStartCh) + reply, priv, err := client.RequestAutoEncryptCerts(ctx, addrs, a.config.ServerPort, a.tokens.AgentToken()) if err != nil { return nil, err } @@ -960,7 +957,8 @@ func (a *Agent) setupClientAutoEncryptWatching(rootsReq *structs.DCSpecificReque // check auto encrypt client cert expiration if a.tlsConfigurator.AutoEncryptCertExpired() { autoLogger.Debug("client certificate expired.") - reply, err := a.setupClientAutoEncrypt() + // Background because the context is mainly useful when the agent is first starting up. + reply, err := a.setupClientAutoEncrypt(context.Background()) if err != nil { autoLogger.Error("client certificate expired, failed to renew", "error", err) // in case of an error, try again in one minute diff --git a/agent/auto-config/auto_config.go b/agent/auto-config/auto_config.go index e04becc239..8a8565b23b 100644 --- a/agent/auto-config/auto_config.go +++ b/agent/auto-config/auto_config.go @@ -262,6 +262,7 @@ func (ac *AutoConfig) InitialConfiguration(ctx context.Context) (*config.Runtime } if !ready { + ac.logger.Info("retrieving initial agent auto configuration remotely") if err := ac.getInitialConfiguration(ctx); err != nil { return nil, err } @@ -447,7 +448,7 @@ func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context) (bool, er return false, ctx.Err() } - ac.logger.Debug("Making Cluster.AutoConfig RPC", "addr", addr.String()) + ac.logger.Debug("making Cluster.AutoConfig RPC", "addr", addr.String()) if err = ac.directRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "Cluster.AutoConfig", &request, &reply); err != nil { ac.logger.Error("AutoConfig RPC failed", "addr", addr.String(), "error", err) continue @@ -457,7 +458,7 @@ func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context) (bool, er } } - return false, nil + return false, ctx.Err() } // getInitialConfiguration implements a loop to retry calls to getInitialConfigurationOnce. @@ -469,11 +470,16 @@ func (ac *AutoConfig) getInitialConfiguration(ctx context.Context) error { for { select { case <-wait: - if done, err := ac.getInitialConfigurationOnce(ctx); done { + done, err := ac.getInitialConfigurationOnce(ctx) + if done { return err } + if err != nil { + ac.logger.Error(err.Error()) + } wait = ac.waiter.Failed() case <-ctx.Done(): + ac.logger.Info("interrupted during initial auto configuration", "err", ctx.Err()) return ctx.Err() } } diff --git a/agent/config/builder.go b/agent/config/builder.go index b9217c1936..8a6471fcc7 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1923,6 +1923,15 @@ func (b *Builder) autoConfigVal(raw AutoConfigRaw) AutoConfig { val.Enabled = b.boolValWithDefault(raw.Enabled, false) val.IntroToken = b.stringVal(raw.IntroToken) + + // default the IntroToken to the env variable if specified. + if envToken := os.Getenv("CONSUL_INTRO_TOKEN"); envToken != "" { + if val.IntroToken != "" { + b.warn("Both auto_config.intro_token and the CONSUL_INTRO_TOKEN environment variable are set. Using the value from the environment variable") + } + + val.IntroToken = envToken + } val.IntroTokenFile = b.stringVal(raw.IntroTokenFile) // These can be go-discover values and so don't have to resolve fully yet val.ServerAddresses = b.expandAllOptionalAddrs("auto_config.server_addresses", raw.ServerAddresses) @@ -1995,9 +2004,9 @@ func (b *Builder) validateAutoConfig(rt RuntimeConfig) error { // When both are set we will prefer the given value over the file. if autoconf.IntroToken != "" && autoconf.IntroTokenFile != "" { - b.warn("auto_config.intro_token and auto_config.intro_token_file are both set. Using the value of auto_config.intro_token") + b.warn("Both an intro token and intro token file are set. The intro token will be used instead of the file") } else if autoconf.IntroToken == "" && autoconf.IntroTokenFile == "" { - return fmt.Errorf("one of auto_config.intro_token or auto_config.intro_token_file must be set to enable auto_config") + return fmt.Errorf("One of auto_config.intro_token, auto_config.intro_token_file or the CONSUL_INTRO_TOKEN environment variable must be set to enable auto_config") } if len(autoconf.ServerAddresses) == 0 { diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index de84f9b6b0..cd323799f1 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -3846,7 +3846,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { "server_addresses": ["198.18.0.1"] } }`}, - err: "one of auto_config.intro_token or auto_config.intro_token_file must be set to enable auto_config", + err: "One of auto_config.intro_token, auto_config.intro_token_file or the CONSUL_INTRO_TOKEN environment variable must be set to enable auto_config", }, { @@ -3898,7 +3898,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { }`}, warns: []string{ "Cannot parse ip \"invalid\" from auto_config.ip_sans", - "auto_config.intro_token and auto_config.intro_token_file are both set. Using the value of auto_config.intro_token", + "Both an intro token and intro token file are set. The intro token will be used instead of the file", }, patch: func(rt *RuntimeConfig) { rt.AutoConfig.Enabled = true diff --git a/agent/consul/auto_encrypt.go b/agent/consul/auto_encrypt.go index 3beace5984..4f176933ff 100644 --- a/agent/consul/auto_encrypt.go +++ b/agent/consul/auto_encrypt.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "net" "strings" @@ -18,7 +19,7 @@ const ( retryJitterWindow = 30 * time.Second ) -func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token string, interruptCh chan struct{}) (*structs.SignedResponse, string, error) { +func (c *Client) RequestAutoEncryptCerts(ctx context.Context, servers []string, port int, token string) (*structs.SignedResponse, string, error) { errFn := func(err error) (*structs.SignedResponse, string, error) { return nil, "", err } @@ -92,8 +93,8 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin attempts := 0 for { select { - case <-interruptCh: - return errFn(fmt.Errorf("aborting AutoEncrypt because interrupted")) + case <-ctx.Done(): + return errFn(fmt.Errorf("aborting AutoEncrypt because interrupted: %w", ctx.Err())) default: } @@ -124,8 +125,8 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin select { case <-time.After(interval): continue - case <-interruptCh: - return errFn(fmt.Errorf("aborting AutoEncrypt because interrupted")) + case <-ctx.Done(): + return errFn(fmt.Errorf("aborting AutoEncrypt because interrupted: %w", ctx.Err())) case <-c.shutdownCh: return errFn(fmt.Errorf("aborting AutoEncrypt because shutting down")) } diff --git a/agent/consul/auto_encrypt_test.go b/agent/consul/auto_encrypt_test.go index 5bc8d8865b..85ed253f4e 100644 --- a/agent/consul/auto_encrypt_test.go +++ b/agent/consul/auto_encrypt_test.go @@ -1,6 +1,7 @@ package consul import ( + "context" "net" "os" "testing" @@ -90,11 +91,14 @@ func TestAutoEncrypt_RequestAutoEncryptCerts(t *testing.T) { servers := []string{"localhost"} port := 8301 token := "" - interruptCh := make(chan struct{}) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(75*time.Millisecond)) + defer cancel() + doneCh := make(chan struct{}) var err error go func() { - _, _, err = c1.RequestAutoEncryptCerts(servers, port, token, interruptCh) + _, _, err = c1.RequestAutoEncryptCerts(ctx, servers, port, token) close(doneCh) }() select { @@ -104,9 +108,8 @@ func TestAutoEncrypt_RequestAutoEncryptCerts(t *testing.T) { // in the setup phase before entering the for loop in // RequestAutoEncryptCerts. require.NoError(t, err) - case <-time.After(50 * time.Millisecond): + case <-ctx.Done(): // this is the happy case since auto encrypt is in its loop to // try to request certs. - interruptCh <- struct{}{} } } diff --git a/agent/testagent.go b/agent/testagent.go index 99dbdac923..635ae6e581 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -258,7 +259,7 @@ func (a *TestAgent) Start(t *testing.T) (err error) { id := string(a.Config.NodeID) - if err := agent.Start(); err != nil { + if err := agent.Start(context.Background()); err != nil { cleanupTmpDir() agent.ShutdownAgent() agent.ShutdownEndpoints() diff --git a/command/agent/agent.go b/command/agent/agent.go index 9cce01cdda..58878bb860 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1,6 +1,7 @@ package agent import ( + "context" "flag" "fmt" "os" @@ -212,16 +213,17 @@ func (c *cmd) run(args []string) int { // wait for signal signalCh := make(chan os.Signal, 10) - stopCh := make(chan struct{}) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGPIPE) + ctx, cancel := context.WithCancel(context.Background()) + go func() { for { var sig os.Signal select { case s := <-signalCh: sig = s - case <-stopCh: + case <-ctx.Done(): return } @@ -235,18 +237,16 @@ func (c *cmd) run(args []string) int { default: c.logger.Info("Caught", "signal", sig) - agent.InterruptStartCh <- struct{}{} + cancel() return } } }() - err = agent.Start() + err = agent.Start(ctx) signal.Stop(signalCh) - select { - case stopCh <- struct{}{}: - default: - } + cancel() + if err != nil { c.logger.Error("Error starting agent", "error", err) return 1