mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 19:50:36 +00:00
agent: simplify agent creation
This patch creates an agent with just a config struct and allows for other fields to be set as required.
This commit is contained in:
parent
82650f73e3
commit
3865f14a25
@ -17,14 +17,11 @@ import (
|
||||
)
|
||||
|
||||
func TestACL_Bad_Config(t *testing.T) {
|
||||
config := nextConfig()
|
||||
config.ACLDownPolicy = "nope"
|
||||
c := nextConfig()
|
||||
c.ACLDownPolicy = "nope"
|
||||
c.DataDir = testutil.TempDir(t, "agent")
|
||||
|
||||
var err error
|
||||
config.DataDir = testutil.TempDir(t, "agent")
|
||||
defer os.RemoveAll(config.DataDir)
|
||||
|
||||
_, err = Create(config, nil, nil, nil)
|
||||
_, err := NewAgent(c)
|
||||
if err == nil || !strings.Contains(err.Error(), "invalid ACL down policy") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -83,10 +83,10 @@ type Agent struct {
|
||||
logger *log.Logger
|
||||
|
||||
// Output sink for logs
|
||||
logOutput io.Writer
|
||||
LogOutput io.Writer
|
||||
|
||||
// Used for streaming logs to
|
||||
logWriter *logger.LogWriter
|
||||
LogWriter *logger.LogWriter
|
||||
|
||||
// delegate is either a *consul.Server or *consul.Client
|
||||
// depending on the configuration
|
||||
@ -160,23 +160,7 @@ type Agent struct {
|
||||
wgServers sync.WaitGroup
|
||||
}
|
||||
|
||||
// Create is used to create a new Agent. Returns
|
||||
// the agent or potentially an error.
|
||||
func Create(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
|
||||
a, err := NewAgent(c, logOutput, logWriter, reloadCh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := a.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
|
||||
if logOutput == nil {
|
||||
logOutput = os.Stderr
|
||||
}
|
||||
func NewAgent(c *Config) (*Agent, error) {
|
||||
if c.Datacenter == "" {
|
||||
return nil, fmt.Errorf("Must configure a Datacenter")
|
||||
}
|
||||
@ -199,8 +183,6 @@ func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloa
|
||||
a := &Agent{
|
||||
config: c,
|
||||
acls: acls,
|
||||
logOutput: logOutput,
|
||||
logWriter: logWriter,
|
||||
checkReapAfter: make(map[types.CheckID]time.Duration),
|
||||
checkMonitors: make(map[types.CheckID]*CheckMonitor),
|
||||
checkTTLs: make(map[types.CheckID]*CheckTTL),
|
||||
@ -209,7 +191,7 @@ func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloa
|
||||
checkDockers: make(map[types.CheckID]*CheckDocker),
|
||||
eventCh: make(chan serf.UserEvent, 1024),
|
||||
eventBuf: make([]*UserEvent, 256),
|
||||
reloadCh: reloadCh,
|
||||
reloadCh: make(chan chan error),
|
||||
shutdownCh: make(chan struct{}),
|
||||
endpoints: make(map[string]string),
|
||||
dnsAddr: dnsAddr,
|
||||
@ -224,7 +206,10 @@ func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloa
|
||||
func (a *Agent) Start() error {
|
||||
c := a.config
|
||||
|
||||
a.logger = log.New(a.logOutput, "", log.LstdFlags)
|
||||
if a.LogOutput == nil {
|
||||
a.LogOutput = os.Stderr
|
||||
}
|
||||
a.logger = log.New(a.LogOutput, "", log.LstdFlags)
|
||||
|
||||
// Retrieve or generate the node ID before setting up the rest of the
|
||||
// agent, which depends on it.
|
||||
@ -294,7 +279,7 @@ func (a *Agent) Start() error {
|
||||
|
||||
// start dns server
|
||||
if c.Ports.DNS > 0 {
|
||||
srv, err := NewDNSServer(a, &c.DNSConfig, a.logOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors)
|
||||
srv, err := NewDNSServer(a, &c.DNSConfig, a.LogOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error starting DNS server: %s", err)
|
||||
}
|
||||
@ -659,7 +644,7 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
|
||||
}
|
||||
|
||||
// Setup the loggers
|
||||
base.LogOutput = a.logOutput
|
||||
base.LogOutput = a.LogOutput
|
||||
return base, nil
|
||||
}
|
||||
|
||||
@ -1040,6 +1025,12 @@ func (a *Agent) Shutdown() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ReloadCh is used to return a channel that can be
|
||||
// used for triggering reloads and returning a response.
|
||||
func (a *Agent) ReloadCh() chan chan error {
|
||||
return a.reloadCh
|
||||
}
|
||||
|
||||
// ShutdownCh is used to return a channel that can be
|
||||
// selected to wait for the agent to perform a shutdown.
|
||||
func (a *Agent) ShutdownCh() <-chan struct{} {
|
||||
|
@ -74,14 +74,14 @@ func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (i
|
||||
// Trigger the reload
|
||||
errCh := make(chan error, 0)
|
||||
select {
|
||||
case <-s.agent.ShutdownCh():
|
||||
case <-s.agent.shutdownCh:
|
||||
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
|
||||
case s.agent.reloadCh <- errCh:
|
||||
}
|
||||
|
||||
// Wait for the result of the reload, or for the agent to shutdown
|
||||
select {
|
||||
case <-s.agent.ShutdownCh():
|
||||
case <-s.agent.shutdownCh:
|
||||
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
|
||||
case err := <-errCh:
|
||||
return nil, err
|
||||
@ -656,15 +656,15 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
|
||||
logCh: make(chan string, 512),
|
||||
logger: s.agent.logger,
|
||||
}
|
||||
s.agent.logWriter.RegisterHandler(handler)
|
||||
defer s.agent.logWriter.DeregisterHandler(handler)
|
||||
s.agent.LogWriter.RegisterHandler(handler)
|
||||
defer s.agent.LogWriter.DeregisterHandler(handler)
|
||||
notify := resp.(http.CloseNotifier).CloseNotify()
|
||||
|
||||
// Stream logs until the connection is closed.
|
||||
for {
|
||||
select {
|
||||
case <-notify:
|
||||
s.agent.logWriter.DeregisterHandler(handler)
|
||||
s.agent.LogWriter.DeregisterHandler(handler)
|
||||
if handler.droppedCount > 0 {
|
||||
s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount)
|
||||
}
|
||||
|
@ -98,38 +98,43 @@ func nextConfig() *Config {
|
||||
}
|
||||
|
||||
func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWriter) (string, *Agent) {
|
||||
dir := testutil.TempDir(t, "agent")
|
||||
|
||||
conf.DataDir = dir
|
||||
agent, err := Create(conf, l, writer, nil)
|
||||
conf.DataDir = testutil.TempDir(t, "agent")
|
||||
agent, err := NewAgent(conf)
|
||||
if err != nil {
|
||||
os.RemoveAll(dir)
|
||||
os.RemoveAll(conf.DataDir)
|
||||
t.Fatalf(fmt.Sprintf("err: %v", err))
|
||||
}
|
||||
|
||||
return dir, agent
|
||||
agent.LogOutput = l
|
||||
agent.LogWriter = writer
|
||||
if err := agent.Start(); err != nil {
|
||||
os.RemoveAll(conf.DataDir)
|
||||
t.Fatalf(fmt.Sprintf("err: %v", err))
|
||||
}
|
||||
return conf.DataDir, agent
|
||||
}
|
||||
|
||||
func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) {
|
||||
dir := testutil.TempDir(t, "agent")
|
||||
conf.DataDir = testutil.TempDir(t, "agent")
|
||||
|
||||
conf.DataDir = dir
|
||||
|
||||
fileLAN := filepath.Join(dir, serfLANKeyring)
|
||||
fileLAN := filepath.Join(conf.DataDir, serfLANKeyring)
|
||||
if err := initKeyring(fileLAN, key); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
fileWAN := filepath.Join(dir, serfWANKeyring)
|
||||
fileWAN := filepath.Join(conf.DataDir, serfWANKeyring)
|
||||
if err := initKeyring(fileWAN, key); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
agent, err := Create(conf, nil, nil, nil)
|
||||
agent, err := NewAgent(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
os.RemoveAll(conf.DataDir)
|
||||
t.Fatal("Error creating agent:", err)
|
||||
}
|
||||
|
||||
return dir, agent
|
||||
if err := agent.Start(); err != nil {
|
||||
os.RemoveAll(conf.DataDir)
|
||||
t.Fatal("Error starting agent:", err)
|
||||
}
|
||||
return conf.DataDir, agent
|
||||
}
|
||||
|
||||
func makeAgent(t *testing.T, conf *Config) (string, *Agent) {
|
||||
@ -1061,10 +1066,13 @@ func TestAgent_PersistService(t *testing.T) {
|
||||
agent.Shutdown()
|
||||
|
||||
// Should load it back during later start
|
||||
agent2, err := Create(config, nil, nil, nil)
|
||||
agent2, err := NewAgent(config)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := agent2.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer agent2.Shutdown()
|
||||
|
||||
restored, ok := agent2.state.services[svc.ID]
|
||||
@ -1195,10 +1203,13 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
|
||||
}
|
||||
|
||||
config.Services = []*ServiceDefinition{svc2}
|
||||
agent2, err := Create(config, nil, nil, nil)
|
||||
agent2, err := NewAgent(config)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := agent2.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer agent2.Shutdown()
|
||||
|
||||
file := filepath.Join(agent.config.DataDir, servicesDir, stringHash(svc1.ID))
|
||||
@ -1288,10 +1299,13 @@ func TestAgent_PersistCheck(t *testing.T) {
|
||||
agent.Shutdown()
|
||||
|
||||
// Should load it back during later start
|
||||
agent2, err := Create(config, nil, nil, nil)
|
||||
agent2, err := NewAgent(config)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := agent2.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer agent2.Shutdown()
|
||||
|
||||
result, ok := agent2.state.checks[check.CheckID]
|
||||
@ -1379,10 +1393,13 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
|
||||
}
|
||||
|
||||
config.Checks = []*CheckDefinition{check2}
|
||||
agent2, err := Create(config, nil, nil, nil)
|
||||
agent2, err := NewAgent(config)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := agent2.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer agent2.Shutdown()
|
||||
|
||||
file := filepath.Join(agent.config.DataDir, checksDir, checkIDHash(check1.CheckID))
|
||||
|
@ -45,7 +45,6 @@ type Command struct {
|
||||
VersionPrerelease string
|
||||
HumanVersion string
|
||||
ShutdownCh <-chan struct{}
|
||||
configReloadCh chan chan error
|
||||
args []string
|
||||
logFilter *logutils.LevelFilter
|
||||
logOutput io.Writer
|
||||
@ -644,9 +643,6 @@ func (c *Command) Run(args []string) int {
|
||||
c.logFilter = logFilter
|
||||
c.logOutput = logOutput
|
||||
|
||||
// Setup the channel for triggering config reloads
|
||||
c.configReloadCh = make(chan chan error)
|
||||
|
||||
// Setup telemetry
|
||||
// Aggregate on 10 second intervals for 1 minute. Expose the
|
||||
// metrics over stderr when there is a SIGUSR1 received.
|
||||
@ -741,8 +737,14 @@ func (c *Command) Run(args []string) int {
|
||||
|
||||
// Create the agent
|
||||
c.UI.Output("Starting Consul agent...")
|
||||
agent, err := Create(config, logOutput, logWriter, c.configReloadCh)
|
||||
agent, err := NewAgent(config)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error creating agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
agent.LogOutput = logOutput
|
||||
agent.LogWriter = logWriter
|
||||
if err := agent.Start(); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
@ -865,7 +867,7 @@ WAIT:
|
||||
select {
|
||||
case s := <-signalCh:
|
||||
sig = s
|
||||
case ch := <-c.configReloadCh:
|
||||
case ch := <-c.agent.reloadCh:
|
||||
sig = syscall.SIGHUP
|
||||
reloadErrCh = ch
|
||||
case <-c.ShutdownCh:
|
||||
|
@ -13,13 +13,12 @@ func TestReloadCommand_implements(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReloadCommandRun(t *testing.T) {
|
||||
reloadCh := make(chan chan error)
|
||||
a1 := testAgentWithConfigReload(t, nil, reloadCh)
|
||||
a1 := testAgentWithConfig(t, nil)
|
||||
defer a1.Shutdown()
|
||||
|
||||
// Setup a dummy response to errCh to simulate a successful reload
|
||||
go func() {
|
||||
errCh := <-reloadCh
|
||||
errCh := <-a1.agent.ReloadCh()
|
||||
errCh <- nil
|
||||
}()
|
||||
|
||||
|
@ -53,20 +53,21 @@ func testAgentWithAPIClient(t *testing.T) (*server, *api.Client) {
|
||||
}
|
||||
|
||||
func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *server {
|
||||
return testAgentWithConfigReload(t, cb, nil)
|
||||
}
|
||||
|
||||
func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *server {
|
||||
conf := nextConfig()
|
||||
if cb != nil {
|
||||
cb(conf)
|
||||
}
|
||||
|
||||
conf.DataDir = testutil.TempDir(t, "agent")
|
||||
a, err := agent.Create(conf, logger.NewLogWriter(512), nil, reloadCh)
|
||||
a, err := agent.NewAgent(conf)
|
||||
if err != nil {
|
||||
os.RemoveAll(conf.DataDir)
|
||||
t.Fatalf("err: %v", err)
|
||||
t.Fatal("Error creating agent:", err)
|
||||
}
|
||||
a.LogOutput = logger.NewLogWriter(512)
|
||||
if err := a.Start(); err != nil {
|
||||
os.RemoveAll(conf.DataDir)
|
||||
t.Fatalf("Error starting agent: %v", err)
|
||||
}
|
||||
|
||||
conf.Addresses.HTTP = "127.0.0.1"
|
||||
|
Loading…
x
Reference in New Issue
Block a user