agent: First pass at agent-based watches

This commit is contained in:
Armon Dadgar 2014-08-21 13:09:13 -07:00
parent e877753162
commit 4b547a43d0
6 changed files with 185 additions and 5 deletions

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/mitchellh/cli"
@ -37,6 +38,7 @@ type Command struct {
ShutdownCh <-chan struct{}
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
rpcServer *AgentRPC
httpServer *HTTPServer
@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config {
return nil
}
// Compile all the watches
for _, params := range config.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}
// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}
// Store the watch plan
config.WatchPlans = append(config.WatchPlans, wp)
}
// Warn if we are in expect mode
if config.BootstrapExpect == 1 {
c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
return logGate, logWriter, logOutput
}
@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int {
}
}
// Get the new client listener addr
httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Register the watches
for _, wp := range config.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}
// Let the agent know we've finished registration
c.agent.StartSync()
@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config {
}
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Deregister the old watches
for _, wp := range config.WatchPlans {
wp.Stop()
}
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}
return newConf
}

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure"
)
@ -256,6 +257,9 @@ type Config struct {
// VersionPrerelease is a label for pre-release builds
VersionPrerelease string `mapstructure:"-"`
// WatchPlans contains the compiled watches
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
}
type dirEnts []os.FileInfo
@ -307,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) {
return &net.TCPAddr{IP: ip, Port: port}, nil
}
// ClientListenerAddr is used to format an address for a
// port on a ClientAddr, handling the zero IP.
func (c *Config) ClientListenerAddr(port int) (string, error) {
addr, err := c.ClientListener(port)
if err != nil {
return "", err
}
if addr.IP.IsUnspecified() {
addr.IP = net.ParseIP("127.0.0.1")
}
return addr.String(), nil
}
// DecodeConfig reads the configuration from the given reader in JSON
// format and decodes it into a proper Config structure.
func DecodeConfig(r io.Reader) (*Config, error) {
@ -656,6 +673,9 @@ func MergeConfig(a, b *Config) *Config {
if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...)
}
if len(b.WatchPlans) != 0 {
result.WatchPlans = append(result.WatchPlans, b.WatchPlans...)
}
// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))

View File

@ -280,7 +280,7 @@ PARSE:
// _name._tag.service.consul
d.serviceLookup(network, datacenter, labels[n-3][1:], tag, req, resp)
// Consul 0.3 and prior format for SRV queries
// Consul 0.3 and prior format for SRV queries
} else {
// Support "." in the label, re-join all the parts

View File

@ -0,0 +1,88 @@
package agent
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/exec"
"runtime"
"strconv"
"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
)
const (
// Limit the size of a watch handlers's output to the
// last WatchBufSize. Prevents an enormous buffer
// from being captured
WatchBufSize = 4 * 1024 // 4KB
)
// verifyWatchHandler does the pre-check for our handler configuration
func verifyWatchHandler(params interface{}) error {
if params == nil {
return fmt.Errorf("Must provide watch handler")
}
_, ok := params.(string)
if !ok {
return fmt.Errorf("Watch handler must be a string")
}
return nil
}
// makeWatchHandler returns a handler for the given watch
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
script := params.(string)
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
// Determine the shell invocation based on OS
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}
// Create the command
cmd := exec.Command(shell, flag, script)
cmd.Env = append(os.Environ(),
"CONSUL_INDEX="+strconv.FormatUint(idx, 10),
)
// Collect the output
output, _ := circbuf.NewBuffer(WatchBufSize)
cmd.Stdout = output
cmd.Stderr = output
// Setup the input
var inp bytes.Buffer
enc := json.NewEncoder(&inp)
if err := enc.Encode(data); err != nil {
logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err)
return
}
cmd.Stdin = &inp
// Run the handler
if err := cmd.Run(); err != nil {
logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err)
}
// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}
// Log the output
logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr)
}
return fn
}

View File

@ -3,6 +3,7 @@ package watch
import (
"fmt"
"log"
"os"
"reflect"
"time"
@ -32,6 +33,13 @@ func (p *WatchPlan) Run(address string) error {
}
p.client = client
// Create the logger
output := p.LogOutput
if output == nil {
output = os.Stderr
}
logger := log.New(output, "", log.LstdFlags)
// Loop until we are canceled
failures := 0
OUTER:
@ -47,14 +55,14 @@ OUTER:
// Handle an error in the watch function
if err != nil {
log.Printf("consul.watch: Watch (type: %s) errored: %v", p.Type, err)
// Perform an exponential backoff
failures++
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
logger.Printf("consul.watch: Watch (type: %s) errored: %v, retry in %v",
p.Type, err, retry)
select {
case <-time.After(retry):
continue OUTER

View File

@ -2,6 +2,7 @@ package watch
import (
"fmt"
"io"
"sync"
"github.com/armon/consul-api"
@ -16,8 +17,10 @@ type WatchPlan struct {
Token string
Type string
Exempt map[string]interface{}
Func WatchFunc
Handler HandlerFunc
Func WatchFunc
Handler HandlerFunc
LogOutput io.Writer
address string
client *consulapi.Client