From 3865f14a255108e7b3fd7e276c9309d97f911f39 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Fri, 19 May 2017 17:51:39 +0200 Subject: [PATCH] agent: simplify agent creation This patch creates an agent with just a config struct and allows for other fields to be set as required. --- command/agent/acl_test.go | 11 +++---- command/agent/agent.go | 41 +++++++++--------------- command/agent/agent_endpoint.go | 10 +++--- command/agent/agent_test.go | 57 +++++++++++++++++++++------------ command/agent/command.go | 14 ++++---- command/reload_test.go | 5 ++- command/util_test.go | 13 ++++---- 7 files changed, 79 insertions(+), 72 deletions(-) diff --git a/command/agent/acl_test.go b/command/agent/acl_test.go index 7f7c770013..b9799306be 100644 --- a/command/agent/acl_test.go +++ b/command/agent/acl_test.go @@ -17,14 +17,11 @@ import ( ) func TestACL_Bad_Config(t *testing.T) { - config := nextConfig() - config.ACLDownPolicy = "nope" + c := nextConfig() + c.ACLDownPolicy = "nope" + c.DataDir = testutil.TempDir(t, "agent") - var err error - config.DataDir = testutil.TempDir(t, "agent") - defer os.RemoveAll(config.DataDir) - - _, err = Create(config, nil, nil, nil) + _, err := NewAgent(c) if err == nil || !strings.Contains(err.Error(), "invalid ACL down policy") { t.Fatalf("err: %v", err) } diff --git a/command/agent/agent.go b/command/agent/agent.go index 1c492bca58..f20a347264 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -83,10 +83,10 @@ type Agent struct { logger *log.Logger // Output sink for logs - logOutput io.Writer + LogOutput io.Writer // Used for streaming logs to - logWriter *logger.LogWriter + LogWriter *logger.LogWriter // delegate is either a *consul.Server or *consul.Client // depending on the configuration @@ -160,23 +160,7 @@ type Agent struct { wgServers sync.WaitGroup } -// Create is used to create a new Agent. Returns -// the agent or potentially an error. -func Create(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { - a, err := NewAgent(c, logOutput, logWriter, reloadCh) - if err != nil { - return nil, err - } - if err := a.Start(); err != nil { - return nil, err - } - return a, nil -} - -func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { - if logOutput == nil { - logOutput = os.Stderr - } +func NewAgent(c *Config) (*Agent, error) { if c.Datacenter == "" { return nil, fmt.Errorf("Must configure a Datacenter") } @@ -199,8 +183,6 @@ func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloa a := &Agent{ config: c, acls: acls, - logOutput: logOutput, - logWriter: logWriter, checkReapAfter: make(map[types.CheckID]time.Duration), checkMonitors: make(map[types.CheckID]*CheckMonitor), checkTTLs: make(map[types.CheckID]*CheckTTL), @@ -209,7 +191,7 @@ func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloa checkDockers: make(map[types.CheckID]*CheckDocker), eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), - reloadCh: reloadCh, + reloadCh: make(chan chan error), shutdownCh: make(chan struct{}), endpoints: make(map[string]string), dnsAddr: dnsAddr, @@ -224,7 +206,10 @@ func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloa func (a *Agent) Start() error { c := a.config - a.logger = log.New(a.logOutput, "", log.LstdFlags) + if a.LogOutput == nil { + a.LogOutput = os.Stderr + } + a.logger = log.New(a.LogOutput, "", log.LstdFlags) // Retrieve or generate the node ID before setting up the rest of the // agent, which depends on it. @@ -294,7 +279,7 @@ func (a *Agent) Start() error { // start dns server if c.Ports.DNS > 0 { - srv, err := NewDNSServer(a, &c.DNSConfig, a.logOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors) + srv, err := NewDNSServer(a, &c.DNSConfig, a.LogOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors) if err != nil { return fmt.Errorf("error starting DNS server: %s", err) } @@ -659,7 +644,7 @@ func (a *Agent) consulConfig() (*consul.Config, error) { } // Setup the loggers - base.LogOutput = a.logOutput + base.LogOutput = a.LogOutput return base, nil } @@ -1040,6 +1025,12 @@ func (a *Agent) Shutdown() error { return err } +// ReloadCh is used to return a channel that can be +// used for triggering reloads and returning a response. +func (a *Agent) ReloadCh() chan chan error { + return a.reloadCh +} + // ShutdownCh is used to return a channel that can be // selected to wait for the agent to perform a shutdown. func (a *Agent) ShutdownCh() <-chan struct{} { diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index f67805a312..a5871af5f9 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -74,14 +74,14 @@ func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (i // Trigger the reload errCh := make(chan error, 0) select { - case <-s.agent.ShutdownCh(): + case <-s.agent.shutdownCh: return nil, fmt.Errorf("Agent was shutdown before reload could be completed") case s.agent.reloadCh <- errCh: } // Wait for the result of the reload, or for the agent to shutdown select { - case <-s.agent.ShutdownCh(): + case <-s.agent.shutdownCh: return nil, fmt.Errorf("Agent was shutdown before reload could be completed") case err := <-errCh: return nil, err @@ -656,15 +656,15 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( logCh: make(chan string, 512), logger: s.agent.logger, } - s.agent.logWriter.RegisterHandler(handler) - defer s.agent.logWriter.DeregisterHandler(handler) + s.agent.LogWriter.RegisterHandler(handler) + defer s.agent.LogWriter.DeregisterHandler(handler) notify := resp.(http.CloseNotifier).CloseNotify() // Stream logs until the connection is closed. for { select { case <-notify: - s.agent.logWriter.DeregisterHandler(handler) + s.agent.LogWriter.DeregisterHandler(handler) if handler.droppedCount > 0 { s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount) } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 6726984cb8..b98691ab96 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -98,38 +98,43 @@ func nextConfig() *Config { } func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWriter) (string, *Agent) { - dir := testutil.TempDir(t, "agent") - - conf.DataDir = dir - agent, err := Create(conf, l, writer, nil) + conf.DataDir = testutil.TempDir(t, "agent") + agent, err := NewAgent(conf) if err != nil { - os.RemoveAll(dir) + os.RemoveAll(conf.DataDir) t.Fatalf(fmt.Sprintf("err: %v", err)) } - - return dir, agent + agent.LogOutput = l + agent.LogWriter = writer + if err := agent.Start(); err != nil { + os.RemoveAll(conf.DataDir) + t.Fatalf(fmt.Sprintf("err: %v", err)) + } + return conf.DataDir, agent } func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { - dir := testutil.TempDir(t, "agent") + conf.DataDir = testutil.TempDir(t, "agent") - conf.DataDir = dir - - fileLAN := filepath.Join(dir, serfLANKeyring) + fileLAN := filepath.Join(conf.DataDir, serfLANKeyring) if err := initKeyring(fileLAN, key); err != nil { t.Fatalf("err: %s", err) } - fileWAN := filepath.Join(dir, serfWANKeyring) + fileWAN := filepath.Join(conf.DataDir, serfWANKeyring) if err := initKeyring(fileWAN, key); err != nil { t.Fatalf("err: %s", err) } - agent, err := Create(conf, nil, nil, nil) + agent, err := NewAgent(conf) if err != nil { - t.Fatalf("err: %s", err) + os.RemoveAll(conf.DataDir) + t.Fatal("Error creating agent:", err) } - - return dir, agent + if err := agent.Start(); err != nil { + os.RemoveAll(conf.DataDir) + t.Fatal("Error starting agent:", err) + } + return conf.DataDir, agent } func makeAgent(t *testing.T, conf *Config) (string, *Agent) { @@ -1061,10 +1066,13 @@ func TestAgent_PersistService(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil, nil, nil) + agent2, err := NewAgent(config) if err != nil { t.Fatalf("err: %s", err) } + if err := agent2.Start(); err != nil { + t.Fatal(err) + } defer agent2.Shutdown() restored, ok := agent2.state.services[svc.ID] @@ -1195,10 +1203,13 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } config.Services = []*ServiceDefinition{svc2} - agent2, err := Create(config, nil, nil, nil) + agent2, err := NewAgent(config) if err != nil { t.Fatalf("err: %s", err) } + if err := agent2.Start(); err != nil { + t.Fatal(err) + } defer agent2.Shutdown() file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc1.ID)) @@ -1288,10 +1299,13 @@ func TestAgent_PersistCheck(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil, nil, nil) + agent2, err := NewAgent(config) if err != nil { t.Fatalf("err: %s", err) } + if err := agent2.Start(); err != nil { + t.Fatal(err) + } defer agent2.Shutdown() result, ok := agent2.state.checks[check.CheckID] @@ -1379,10 +1393,13 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } config.Checks = []*CheckDefinition{check2} - agent2, err := Create(config, nil, nil, nil) + agent2, err := NewAgent(config) if err != nil { t.Fatalf("err: %s", err) } + if err := agent2.Start(); err != nil { + t.Fatal(err) + } defer agent2.Shutdown() file := filepath.Join(agent.config.DataDir, checksDir, checkIDHash(check1.CheckID)) diff --git a/command/agent/command.go b/command/agent/command.go index fa7ff547ef..2b757037f3 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -45,7 +45,6 @@ type Command struct { VersionPrerelease string HumanVersion string ShutdownCh <-chan struct{} - configReloadCh chan chan error args []string logFilter *logutils.LevelFilter logOutput io.Writer @@ -644,9 +643,6 @@ func (c *Command) Run(args []string) int { c.logFilter = logFilter c.logOutput = logOutput - // Setup the channel for triggering config reloads - c.configReloadCh = make(chan chan error) - // Setup telemetry // Aggregate on 10 second intervals for 1 minute. Expose the // metrics over stderr when there is a SIGUSR1 received. @@ -741,8 +737,14 @@ func (c *Command) Run(args []string) int { // Create the agent c.UI.Output("Starting Consul agent...") - agent, err := Create(config, logOutput, logWriter, c.configReloadCh) + agent, err := NewAgent(config) if err != nil { + c.UI.Error(fmt.Sprintf("Error creating agent: %s", err)) + return 1 + } + agent.LogOutput = logOutput + agent.LogWriter = logWriter + if err := agent.Start(); err != nil { c.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) return 1 } @@ -865,7 +867,7 @@ WAIT: select { case s := <-signalCh: sig = s - case ch := <-c.configReloadCh: + case ch := <-c.agent.reloadCh: sig = syscall.SIGHUP reloadErrCh = ch case <-c.ShutdownCh: diff --git a/command/reload_test.go b/command/reload_test.go index 8107a8c360..17228edc09 100644 --- a/command/reload_test.go +++ b/command/reload_test.go @@ -13,13 +13,12 @@ func TestReloadCommand_implements(t *testing.T) { } func TestReloadCommandRun(t *testing.T) { - reloadCh := make(chan chan error) - a1 := testAgentWithConfigReload(t, nil, reloadCh) + a1 := testAgentWithConfig(t, nil) defer a1.Shutdown() // Setup a dummy response to errCh to simulate a successful reload go func() { - errCh := <-reloadCh + errCh := <-a1.agent.ReloadCh() errCh <- nil }() diff --git a/command/util_test.go b/command/util_test.go index 4cace08a49..27184dd40b 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -53,20 +53,21 @@ func testAgentWithAPIClient(t *testing.T) (*server, *api.Client) { } func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *server { - return testAgentWithConfigReload(t, cb, nil) -} - -func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *server { conf := nextConfig() if cb != nil { cb(conf) } conf.DataDir = testutil.TempDir(t, "agent") - a, err := agent.Create(conf, logger.NewLogWriter(512), nil, reloadCh) + a, err := agent.NewAgent(conf) if err != nil { os.RemoveAll(conf.DataDir) - t.Fatalf("err: %v", err) + t.Fatal("Error creating agent:", err) + } + a.LogOutput = logger.NewLogWriter(512) + if err := a.Start(); err != nil { + os.RemoveAll(conf.DataDir) + t.Fatalf("Error starting agent: %v", err) } conf.Addresses.HTTP = "127.0.0.1"