diff --git a/command/agent/command.go b/command/agent/command.go index 6410383eaf..2a580eaf79 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -14,6 +14,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-syslog" "github.com/hashicorp/logutils" "github.com/mitchellh/cli" @@ -37,6 +38,7 @@ type Command struct { ShutdownCh <-chan struct{} args []string logFilter *logutils.LevelFilter + logOutput io.Writer agent *Agent rpcServer *AgentRPC httpServer *HTTPServer @@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config { return nil } + // Compile all the watches + for _, params := range config.Watches { + // Parse the watches, excluding the handler + wp, err := watch.ParseExempt(params, []string{"handler"}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err)) + return nil + } + + // Get the handler + if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err)) + return nil + } + + // Store the watch plan + config.WatchPlans = append(config.WatchPlans, wp) + } + // Warn if we are in expect mode if config.BootstrapExpect == 1 { c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.") @@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri } else { logOutput = io.MultiWriter(c.logFilter, logWriter) } + c.logOutput = logOutput return logGate, logWriter, logOutput } @@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int { } } + // Get the new client listener addr + httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + } + + // Register the watches + for _, wp := range config.WatchPlans { + go func() { + wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"]) + wp.LogOutput = c.logOutput + if err := wp.Run(httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + } + }() + } + // Let the agent know we've finished registration c.agent.StartSync() @@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config { } } + // Get the new client listener addr + httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + } + + // Deregister the old watches + for _, wp := range config.WatchPlans { + wp.Stop() + } + + // Register the new watches + for _, wp := range newConf.WatchPlans { + go func() { + wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"]) + wp.LogOutput = c.logOutput + if err := wp.Run(httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + } + }() + } + return newConf } diff --git a/command/agent/config.go b/command/agent/config.go index cba6f3e8f2..f91853cdde 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/watch" "github.com/mitchellh/mapstructure" ) @@ -256,6 +257,9 @@ type Config struct { // VersionPrerelease is a label for pre-release builds VersionPrerelease string `mapstructure:"-"` + + // WatchPlans contains the compiled watches + WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"` } type dirEnts []os.FileInfo @@ -307,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) { return &net.TCPAddr{IP: ip, Port: port}, nil } +// ClientListenerAddr is used to format an address for a +// port on a ClientAddr, handling the zero IP. +func (c *Config) ClientListenerAddr(port int) (string, error) { + addr, err := c.ClientListener(port) + if err != nil { + return "", err + } + if addr.IP.IsUnspecified() { + addr.IP = net.ParseIP("127.0.0.1") + } + return addr.String(), nil +} + // DecodeConfig reads the configuration from the given reader in JSON // format and decodes it into a proper Config structure. func DecodeConfig(r io.Reader) (*Config, error) { @@ -656,6 +673,9 @@ func MergeConfig(a, b *Config) *Config { if len(b.Watches) != 0 { result.Watches = append(result.Watches, b.Watches...) } + if len(b.WatchPlans) != 0 { + result.WatchPlans = append(result.WatchPlans, b.WatchPlans...) + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/dns.go b/command/agent/dns.go index 5e4480ac23..18e2b928b4 100644 --- a/command/agent/dns.go +++ b/command/agent/dns.go @@ -280,7 +280,7 @@ PARSE: // _name._tag.service.consul d.serviceLookup(network, datacenter, labels[n-3][1:], tag, req, resp) - // Consul 0.3 and prior format for SRV queries + // Consul 0.3 and prior format for SRV queries } else { // Support "." in the label, re-join all the parts diff --git a/command/agent/watch_handler.go b/command/agent/watch_handler.go new file mode 100644 index 0000000000..ef9f8a9cf4 --- /dev/null +++ b/command/agent/watch_handler.go @@ -0,0 +1,88 @@ +package agent + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "os" + "os/exec" + "runtime" + "strconv" + + "github.com/armon/circbuf" + "github.com/hashicorp/consul/watch" +) + +const ( + // Limit the size of a watch handlers's output to the + // last WatchBufSize. Prevents an enormous buffer + // from being captured + WatchBufSize = 4 * 1024 // 4KB +) + +// verifyWatchHandler does the pre-check for our handler configuration +func verifyWatchHandler(params interface{}) error { + if params == nil { + return fmt.Errorf("Must provide watch handler") + } + _, ok := params.(string) + if !ok { + return fmt.Errorf("Watch handler must be a string") + } + return nil +} + +// makeWatchHandler returns a handler for the given watch +func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc { + script := params.(string) + logger := log.New(logOutput, "", log.LstdFlags) + fn := func(idx uint64, data interface{}) { + // Determine the shell invocation based on OS + var shell, flag string + if runtime.GOOS == "windows" { + shell = "cmd" + flag = "/C" + } else { + shell = "/bin/sh" + flag = "-c" + } + + // Create the command + cmd := exec.Command(shell, flag, script) + cmd.Env = append(os.Environ(), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + ) + + // Collect the output + output, _ := circbuf.NewBuffer(WatchBufSize) + cmd.Stdout = output + cmd.Stderr = output + + // Setup the input + var inp bytes.Buffer + enc := json.NewEncoder(&inp) + if err := enc.Encode(data); err != nil { + logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err) + return + } + cmd.Stdin = &inp + + // Run the handler + if err := cmd.Run(); err != nil { + logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err) + } + + // Get the output, add a message about truncation + outputStr := string(output.Bytes()) + if output.TotalWritten() > output.Size() { + outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", + output.Size(), output.TotalWritten(), outputStr) + } + + // Log the output + logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr) + } + return fn +} diff --git a/watch/plan.go b/watch/plan.go index 07576dfe11..a6dc057730 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -3,6 +3,7 @@ package watch import ( "fmt" "log" + "os" "reflect" "time" @@ -32,6 +33,13 @@ func (p *WatchPlan) Run(address string) error { } p.client = client + // Create the logger + output := p.LogOutput + if output == nil { + output = os.Stderr + } + logger := log.New(output, "", log.LstdFlags) + // Loop until we are canceled failures := 0 OUTER: @@ -47,14 +55,14 @@ OUTER: // Handle an error in the watch function if err != nil { - log.Printf("consul.watch: Watch (type: %s) errored: %v", p.Type, err) - // Perform an exponential backoff failures++ retry := retryInterval * time.Duration(failures*failures) if retry > maxBackoffTime { retry = maxBackoffTime } + logger.Printf("consul.watch: Watch (type: %s) errored: %v, retry in %v", + p.Type, err, retry) select { case <-time.After(retry): continue OUTER diff --git a/watch/watch.go b/watch/watch.go index 58281faae1..0b0a69a32e 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -2,6 +2,7 @@ package watch import ( "fmt" + "io" "sync" "github.com/armon/consul-api" @@ -16,8 +17,10 @@ type WatchPlan struct { Token string Type string Exempt map[string]interface{} - Func WatchFunc - Handler HandlerFunc + + Func WatchFunc + Handler HandlerFunc + LogOutput io.Writer address string client *consulapi.Client