Merge pull request #298 from hashicorp/f-watch

Adding support for 'watches'
This commit is contained in:
Armon Dadgar 2014-08-21 17:35:40 -07:00
commit 9e13633af8
24 changed files with 1817 additions and 24 deletions

View File

@ -6,7 +6,6 @@ import (
"github.com/hashicorp/consul/consul/structs"
"log"
"os/exec"
"runtime"
"sync"
"syscall"
"time"
@ -106,18 +105,13 @@ func (c *CheckMonitor) run() {
// check is invoked periodically to perform the script check
func (c *CheckMonitor) check() {
// Determine the shell invocation based on OS
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}
// Create the command
cmd := exec.Command(shell, flag, c.Script)
cmd, err := ExecScript(c.Script)
if err != nil {
c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", c.Script, err)
c.Notify.UpdateCheck(c.CheckID, structs.HealthUnknown, err.Error())
return
}
// Collect the output
output, _ := circbuf.NewBuffer(CheckBufSize)
@ -140,7 +134,7 @@ func (c *CheckMonitor) check() {
time.Sleep(30 * time.Second)
errCh <- fmt.Errorf("Timed out running check '%s'", c.Script)
}()
err := <-errCh
err = <-errCh
// Get the output, add a message about truncation
outputStr := string(output.Bytes())

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/mitchellh/cli"
@ -37,6 +38,7 @@ type Command struct {
ShutdownCh <-chan struct{}
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
rpcServer *AgentRPC
httpServer *HTTPServer
@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config {
return nil
}
// Compile all the watches
for _, params := range config.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}
// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}
// Store the watch plan
config.WatchPlans = append(config.WatchPlans, wp)
}
// Warn if we are in expect mode
if config.BootstrapExpect == 1 {
c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
return logGate, logWriter, logOutput
}
@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int {
}
}
// Get the new client listener addr
httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Register the watches
for _, wp := range config.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}
// Let the agent know we've finished registration
c.agent.StartSync()
@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config {
}
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Deregister the old watches
for _, wp := range config.WatchPlans {
wp.Stop()
}
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}
return newConf
}

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure"
)
@ -229,6 +230,11 @@ type Config struct {
// this acts like deny.
ACLDownPolicy string `mapstructure:"acl_down_policy"`
// Watches are used to monitor various endpoints and to invoke a
// handler to act appropriately. These are managed entirely in the
// agent layer using the standard APIs.
Watches []map[string]interface{} `mapstructure:"watches"`
// AEInterval controls the anti-entropy interval. This is how often
// the agent attempts to reconcile it's local state with the server'
// representation of our state. Defaults to every 60s.
@ -251,6 +257,9 @@ type Config struct {
// VersionPrerelease is a label for pre-release builds
VersionPrerelease string `mapstructure:"-"`
// WatchPlans contains the compiled watches
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
}
type dirEnts []os.FileInfo
@ -302,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) {
return &net.TCPAddr{IP: ip, Port: port}, nil
}
// ClientListenerAddr is used to format an address for a
// port on a ClientAddr, handling the zero IP.
func (c *Config) ClientListenerAddr(port int) (string, error) {
addr, err := c.ClientListener(port)
if err != nil {
return "", err
}
if addr.IP.IsUnspecified() {
addr.IP = net.ParseIP("127.0.0.1")
}
return addr.String(), nil
}
// DecodeConfig reads the configuration from the given reader in JSON
// format and decodes it into a proper Config structure.
func DecodeConfig(r io.Reader) (*Config, error) {
@ -648,6 +670,12 @@ func MergeConfig(a, b *Config) *Config {
if b.ACLDefaultPolicy != "" {
result.ACLDefaultPolicy = b.ACLDefaultPolicy
}
if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...)
}
if len(b.WatchPlans) != 0 {
result.WatchPlans = append(result.WatchPlans, b.WatchPlans...)
}
// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))

View File

@ -384,6 +384,27 @@ func TestDecodeConfig(t *testing.T) {
if config.ACLDefaultPolicy != "deny" {
t.Fatalf("bad: %#v", config)
}
// Watches
input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.Watches) != 1 {
t.Fatalf("bad: %#v", config)
}
out := config.Watches[0]
exp := map[string]interface{}{
"type": "keyprefix",
"prefix": "foo/",
"handler": "foobar",
}
if !reflect.DeepEqual(out, exp) {
t.Fatalf("bad: %#v", config)
}
}
func TestDecodeConfig_Service(t *testing.T) {
@ -538,6 +559,13 @@ func TestMergeConfig(t *testing.T) {
ACLTTLRaw: "15s",
ACLDownPolicy: "deny",
ACLDefaultPolicy: "deny",
Watches: []map[string]interface{}{
map[string]interface{}{
"type": "keyprefix",
"prefix": "foo/",
"handler": "foobar",
},
},
}
c := MergeConfig(a, b)

View File

@ -3,6 +3,8 @@ package agent
import (
"math"
"math/rand"
"os/exec"
"runtime"
"time"
)
@ -39,3 +41,17 @@ func strContains(l []string, s string) bool {
}
return false
}
// ExecScript returns a command to execute a script
func ExecScript(script string) (*exec.Cmd, error) {
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}
cmd := exec.Command(shell, flag, script)
return cmd, nil
}

View File

@ -0,0 +1,80 @@
package agent
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"
"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
)
const (
// Limit the size of a watch handlers's output to the
// last WatchBufSize. Prevents an enormous buffer
// from being captured
WatchBufSize = 4 * 1024 // 4KB
)
// verifyWatchHandler does the pre-check for our handler configuration
func verifyWatchHandler(params interface{}) error {
if params == nil {
return fmt.Errorf("Must provide watch handler")
}
_, ok := params.(string)
if !ok {
return fmt.Errorf("Watch handler must be a string")
}
return nil
}
// makeWatchHandler returns a handler for the given watch
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
script := params.(string)
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
// Create the command
cmd, err := ExecScript(script)
if err != nil {
logger.Printf("[ERR] agent: Failed to setup watch: %v", err)
return
}
cmd.Env = append(os.Environ(),
"CONSUL_INDEX="+strconv.FormatUint(idx, 10),
)
// Collect the output
output, _ := circbuf.NewBuffer(WatchBufSize)
cmd.Stdout = output
cmd.Stderr = output
// Setup the input
var inp bytes.Buffer
enc := json.NewEncoder(&inp)
if err := enc.Encode(data); err != nil {
logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err)
return
}
cmd.Stdin = &inp
// Run the handler
if err := cmd.Run(); err != nil {
logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err)
}
// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}
// Log the output
logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr)
}
return fn
}

View File

@ -0,0 +1,44 @@
package agent
import (
"io/ioutil"
"os"
"testing"
)
func TestVerifyWatchHandler(t *testing.T) {
if err := verifyWatchHandler(nil); err == nil {
t.Fatalf("should err")
}
if err := verifyWatchHandler(123); err == nil {
t.Fatalf("should err")
}
if err := verifyWatchHandler([]string{"foo"}); err == nil {
t.Fatalf("should err")
}
if err := verifyWatchHandler("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestMakeWatchHandler(t *testing.T) {
defer os.Remove("handler_out")
defer os.Remove("handler_index_out")
script := "echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out"
handler := makeWatchHandler(os.Stderr, script)
handler(100, []string{"foo", "bar", "baz"})
raw, err := ioutil.ReadFile("handler_out")
if err != nil {
t.Fatalf("err: %v", err)
}
if string(raw) != "[\"foo\",\"bar\",\"baz\"]\n" {
t.Fatalf("bad: %s", raw)
}
raw, err = ioutil.ReadFile("handler_index_out")
if err != nil {
t.Fatalf("err: %v", err)
}
if string(raw) != "100\n" {
t.Fatalf("bad: %s", raw)
}
}

View File

@ -2,6 +2,7 @@ package command
import (
"flag"
"github.com/armon/consul-api"
"github.com/hashicorp/consul/command/agent"
)
@ -16,3 +17,17 @@ func RPCAddrFlag(f *flag.FlagSet) *string {
func RPCClient(addr string) (*agent.RPCClient, error) {
return agent.NewRPCClient(addr)
}
// HTTPAddrFlag returns a pointer to a string that will be populated
// when the given flagset is parsed with the HTTP address of the Consul.
func HTTPAddrFlag(f *flag.FlagSet) *string {
return f.String("http-addr", "127.0.0.1:8500",
"HTTP address of the Consul agent")
}
// HTTPClient returns a new Consul HTTP client with the given address.
func HTTPClient(addr string) (*consulapi.Client, error) {
conf := consulapi.DefaultConfig()
conf.Address = addr
return consulapi.NewClient(conf)
}

View File

@ -22,16 +22,19 @@ func init() {
}
type agentWrapper struct {
dir string
config *agent.Config
agent *agent.Agent
rpc *agent.AgentRPC
addr string
dir string
config *agent.Config
agent *agent.Agent
rpc *agent.AgentRPC
http *agent.HTTPServer
addr string
httpAddr string
}
func (a *agentWrapper) Shutdown() {
a.rpc.Shutdown()
a.agent.Shutdown()
a.http.Shutdown()
os.RemoveAll(a.dir)
}
@ -59,12 +62,22 @@ func testAgent(t *testing.T) *agentWrapper {
}
rpc := agent.NewAgentRPC(a, l, mult, lw)
httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP)
http, err := agent.NewHTTPServer(a, "", false, os.Stderr, httpAddr)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
}
return &agentWrapper{
dir: dir,
config: conf,
agent: a,
rpc: rpc,
addr: l.Addr().String(),
dir: dir,
config: conf,
agent: a,
rpc: rpc,
http: http,
addr: l.Addr().String(),
httpAddr: httpAddr,
}
}

211
command/watch.go Normal file
View File

@ -0,0 +1,211 @@
package command
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"os"
"strconv"
"strings"
"github.com/hashicorp/consul/command/agent"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/cli"
)
// WatchCommand is a Command implementation that is used to setup
// a "watch" which uses a sub-process
type WatchCommand struct {
ShutdownCh <-chan struct{}
Ui cli.Ui
}
func (c *WatchCommand) Help() string {
helpText := `
Usage: consul watch [options] [child...]
Watches for changes in a given data view from Consul. If a child process
is specified, it will be invoked with the latest results on changes. Otherwise,
the latest values are dumped to stdout and the watch terminates.
Providing the watch type is required, and other parameters may be required
or supported depending on the watch type.
Options:
-http-addr=127.0.0.1:8500 HTTP address of the Consul agent.
-datacenter="" Datacenter to query. Defaults to that of agent.
-token="" ACL token to use. Defaults to that of agent.
Watch Specification:
-key=val Specifies the key to watch. Only for 'key' type.
-passingonly=[true|false] Specifies if only hosts passing all checks are displayed.
Optional for 'service' type. Defaults false.
-prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type.
-service=val Specifies the service to watch. Required for 'service' type,
optional for 'checks' type.
-state=val Specifies the states to watch. Optional for 'checks' type.
-tag=val Specifies the service tag to filter on. Optional for 'service'
type.
-type=val Specifies the watch type. One of key, keyprefix
services, nodes, service, or checks.
`
return strings.TrimSpace(helpText)
}
func (c *WatchCommand) Run(args []string) int {
var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state string
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.StringVar(&watchType, "type", "", "")
cmdFlags.StringVar(&datacenter, "datacenter", "", "")
cmdFlags.StringVar(&token, "token", "", "")
cmdFlags.StringVar(&key, "key", "", "")
cmdFlags.StringVar(&prefix, "prefix", "", "")
cmdFlags.StringVar(&service, "service", "", "")
cmdFlags.StringVar(&tag, "tag", "", "")
cmdFlags.StringVar(&passingOnly, "passingonly", "", "")
cmdFlags.StringVar(&state, "state", "", "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return 1
}
// Check for a type
if watchType == "" {
c.Ui.Error("Watch type must be specified")
c.Ui.Error("")
c.Ui.Error(c.Help())
return 1
}
// Grab the script to execute if any
script := strings.Join(cmdFlags.Args(), " ")
// Compile the watch parameters
params := make(map[string]interface{})
if watchType != "" {
params["type"] = watchType
}
if datacenter != "" {
params["datacenter"] = datacenter
}
if token != "" {
params["token"] = token
}
if key != "" {
params["key"] = key
}
if prefix != "" {
params["prefix"] = prefix
}
if service != "" {
params["service"] = service
}
if tag != "" {
params["tag"] = tag
}
if state != "" {
params["state"] = state
}
if passingOnly != "" {
b, err := strconv.ParseBool(passingOnly)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse passingonly flag: %s", err))
return 1
}
params["passingonly"] = b
}
// Create the watch
wp, err := watch.Parse(params)
if err != nil {
c.Ui.Error(fmt.Sprintf("%s", err))
return 1
}
// Create and test the HTTP client
client, err := HTTPClient(*httpAddr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
_, err = client.Agent().NodeName()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1
}
// Setup handler
errExit := false
if script == "" {
wp.Handler = func(idx uint64, data interface{}) {
defer wp.Stop()
buf, err := json.MarshalIndent(data, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error encoding output: %s", err))
errExit = true
}
c.Ui.Output(string(buf))
}
} else {
wp.Handler = func(idx uint64, data interface{}) {
// Create the command
var buf bytes.Buffer
var err error
cmd, err := agent.ExecScript(script)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err))
goto ERR
}
cmd.Env = append(os.Environ(),
"CONSUL_INDEX="+strconv.FormatUint(idx, 10),
)
// Encode the input
if err = json.NewEncoder(&buf).Encode(data); err != nil {
c.Ui.Error(fmt.Sprintf("Error encoding output: %s", err))
goto ERR
}
cmd.Stdin = &buf
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Run the handler
if err := cmd.Run(); err != nil {
c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err))
goto ERR
}
return
ERR:
wp.Stop()
errExit = true
}
}
// Watch for a shutdown
go func() {
<-c.ShutdownCh
wp.Stop()
os.Exit(0)
}()
// Run the watch
if err := wp.Run(*httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1
}
// Handle an error exit
if errExit {
return 1
} else {
return 0
}
}
func (c *WatchCommand) Synopsis() string {
return "Watch for changes in Consul"
}

29
command/watch_test.go Normal file
View File

@ -0,0 +1,29 @@
package command
import (
"github.com/mitchellh/cli"
"strings"
"testing"
)
func TestWatchCommand_implements(t *testing.T) {
var _ cli.Command = &WatchCommand{}
}
func TestWatchCommandRun(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
ui := new(cli.MockUi)
c := &WatchCommand{Ui: ui}
args := []string{"-http-addr=" + a1.httpAddr, "-type=nodes"}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
if !strings.Contains(ui.OutputWriter.String(), a1.config.NodeName) {
t.Fatalf("bad: %#v", ui.OutputWriter.String())
}
}

View File

@ -82,6 +82,13 @@ func init() {
Ui: ui,
}, nil
},
"watch": func() (cli.Command, error) {
return &command.WatchCommand{
ShutdownCh: makeShutdownCh(),
Ui: ui,
}, nil
},
}
}

166
watch/funcs.go Normal file
View File

@ -0,0 +1,166 @@
package watch
import (
"fmt"
"github.com/armon/consul-api"
)
// watchFactory is a function that can create a new WatchFunc
// from a parameter configuration
type watchFactory func(params map[string]interface{}) (WatchFunc, error)
// watchFuncFactory maps each type to a factory function
var watchFuncFactory map[string]watchFactory
func init() {
watchFuncFactory = map[string]watchFactory{
"key": keyWatch,
"keyprefix": keyPrefixWatch,
"services": servicesWatch,
"nodes": nodesWatch,
"service": serviceWatch,
"checks": checksWatch,
}
}
// keyWatch is used to return a key watching function
func keyWatch(params map[string]interface{}) (WatchFunc, error) {
var key string
if err := assignValue(params, "key", &key); err != nil {
return nil, err
}
if key == "" {
return nil, fmt.Errorf("Must specify a single key to watch")
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return 0, nil, err
}
if pair == nil {
return meta.LastIndex, nil, err
}
return meta.LastIndex, pair, err
}
return fn, nil
}
// keyPrefixWatch is used to return a key prefix watching function
func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
var prefix string
if err := assignValue(params, "prefix", &prefix); err != nil {
return nil, err
}
if prefix == "" {
return nil, fmt.Errorf("Must specify a single prefix to watch")
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, pairs, err
}
return fn, nil
}
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
services, meta, err := catalog.Services(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, services, err
}
return fn, nil
}
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, nodes, err
}
return fn, nil
}
// serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
var service, tag string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if service == "" {
return nil, fmt.Errorf("Must specify a single service to watch")
}
if err := assignValue(params, "tag", &tag); err != nil {
return nil, err
}
passingOnly := false
if err := assignValueBool(params, "passingonly", &passingOnly); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, nodes, err
}
return fn, nil
}
// checksWatch is used to watch a specific checks in a given state
func checksWatch(params map[string]interface{}) (WatchFunc, error) {
var service, state string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
if service == "" && state == "" {
state = "any"
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
}
if err != nil {
return 0, nil, err
}
return meta.LastIndex, checks, err
}
return fn, nil
}

394
watch/funcs_test.go Normal file
View File

@ -0,0 +1,394 @@
package watch
import (
"os"
"testing"
"time"
"github.com/armon/consul-api"
)
var consulAddr string
func init() {
consulAddr = os.Getenv("CONSUL_ADDR")
}
func TestKeyWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.(*consulapi.KVPair)
if !ok || v == nil || string(v.Value) != "test" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
defer plan.Stop()
time.Sleep(20 * time.Millisecond)
kv := plan.client.KV()
pair := &consulapi.KVPair{
Key: "foo/bar/baz",
Value: []byte("test"),
}
_, err := kv.Put(pair, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the query to run
time.Sleep(20 * time.Millisecond)
plan.Stop()
// Delete the key
_, err = kv.Delete("foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestKeyPrefixWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.(consulapi.KVPairs)
if ok && v == nil {
return
}
if !ok || v == nil || string(v[0].Key) != "foo/bar" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
defer plan.Stop()
time.Sleep(20 * time.Millisecond)
kv := plan.client.KV()
pair := &consulapi.KVPair{
Key: "foo/bar",
}
_, err := kv.Put(pair, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the query to run
time.Sleep(20 * time.Millisecond)
plan.Stop()
// Delete the key
_, err = kv.Delete("foo/bar", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestServicesWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"services"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.(map[string][]string)
if !ok || v["consul"] == nil {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
time.Sleep(20 * time.Millisecond)
plan.Stop()
agent := plan.client.Agent()
reg := &consulapi.AgentServiceRegistration{
ID: "foo",
Name: "foo",
}
agent.ServiceRegister(reg)
time.Sleep(20 * time.Millisecond)
agent.ServiceDeregister("foo")
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestNodesWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"nodes"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.Node)
if !ok || len(v) == 0 {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
time.Sleep(20 * time.Millisecond)
plan.Stop()
catalog := plan.client.Catalog()
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
catalog.Register(reg, nil)
time.Sleep(20 * time.Millisecond)
dereg := &consulapi.CatalogDeregistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
catalog.Deregister(dereg, nil)
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestServiceWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.ServiceEntry)
if ok && len(v) == 0 {
return
}
if !ok || v[0].Service.ID != "foo" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
time.Sleep(20 * time.Millisecond)
agent := plan.client.Agent()
reg := &consulapi.AgentServiceRegistration{
ID: "foo",
Name: "foo",
Tags: []string{"bar"},
}
agent.ServiceRegister(reg)
time.Sleep(20 * time.Millisecond)
plan.Stop()
agent.ServiceDeregister("foo")
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestChecksWatch_State(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"checks", "state":"warning"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.HealthCheck)
if len(v) == 0 {
return
}
if !ok || v[0].CheckID != "foobar" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
time.Sleep(20 * time.Millisecond)
catalog := plan.client.Catalog()
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Check: &consulapi.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: "warning",
},
}
catalog.Register(reg, nil)
time.Sleep(20 * time.Millisecond)
plan.Stop()
dereg := &consulapi.CatalogDeregistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
catalog.Deregister(dereg, nil)
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestChecksWatch_Service(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, `{"type":"checks", "service":"foobar"}`)
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.HealthCheck)
if len(v) == 0 {
return
}
if !ok || v[0].CheckID != "foobar" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
time.Sleep(20 * time.Millisecond)
catalog := plan.client.Catalog()
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &consulapi.AgentService{
ID: "foobar",
Service: "foobar",
},
Check: &consulapi.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: "passing",
ServiceID: "foobar",
},
}
_, err := catalog.Register(reg, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(20 * time.Millisecond)
plan.Stop()
dereg := &consulapi.CatalogDeregistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
catalog.Deregister(dereg, nil)
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}

116
watch/plan.go Normal file
View File

@ -0,0 +1,116 @@
package watch
import (
"fmt"
"log"
"os"
"reflect"
"time"
"github.com/armon/consul-api"
)
const (
// retryInterval is the base retry value
retryInterval = 5 * time.Second
// maximum back off time, this is to prevent
// exponential runaway
maxBackoffTime = 180 * time.Second
)
// Run is used to run a watch plan
func (p *WatchPlan) Run(address string) error {
// Setup the client
p.address = address
conf := consulapi.DefaultConfig()
conf.Address = address
conf.Datacenter = p.Datacenter
conf.Token = p.Token
client, err := consulapi.NewClient(conf)
if err != nil {
return fmt.Errorf("Failed to connect to agent: %v", err)
}
p.client = client
// Create the logger
output := p.LogOutput
if output == nil {
output = os.Stderr
}
logger := log.New(output, "", log.LstdFlags)
// Loop until we are canceled
failures := 0
OUTER:
for !p.shouldStop() {
// Invoke the handler
index, result, err := p.Func(p)
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break
}
// Handle an error in the watch function
if err != nil {
// Perform an exponential backoff
failures++
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
logger.Printf("consul.watch: Watch (type: %s) errored: %v, retry in %v",
p.Type, err, retry)
select {
case <-time.After(retry):
continue OUTER
case <-p.stopCh:
return nil
}
}
// Clear the failures
failures = 0
// If the index is unchanged do nothing
if index == p.lastIndex {
continue
}
// Update the index, look for change
oldIndex := p.lastIndex
p.lastIndex = index
if oldIndex != 0 && reflect.DeepEqual(p.lastResult, result) {
continue
}
// Handle the updated result
p.lastResult = result
if p.Handler != nil {
p.Handler(index, result)
}
}
return nil
}
// Stop is used to stop running the watch plan
func (p *WatchPlan) Stop() {
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.stop {
return
}
p.stop = true
close(p.stopCh)
}
func (p *WatchPlan) shouldStop() bool {
select {
case <-p.stopCh:
return true
default:
return false
}
}

54
watch/plan_test.go Normal file
View File

@ -0,0 +1,54 @@
package watch
import (
"testing"
"time"
)
func init() {
watchFuncFactory["noop"] = noopWatch
}
func noopWatch(params map[string]interface{}) (WatchFunc, error) {
fn := func(p *WatchPlan) (uint64, interface{}, error) {
idx := p.lastIndex + 1
return idx, idx, nil
}
return fn, nil
}
func mustParse(t *testing.T, q string) *WatchPlan {
params := makeParams(t, q)
plan, err := Parse(params)
if err != nil {
t.Fatalf("err: %v", err)
}
return plan
}
func TestRun_Stop(t *testing.T) {
plan := mustParse(t, `{"type":"noop"}`)
var expect uint64 = 1
plan.Handler = func(idx uint64, val interface{}) {
if idx != expect {
t.Fatalf("Bad: %d %d", expect, idx)
}
if val != expect {
t.Fatalf("Bad: %d %d", expect, val)
}
expect++
}
time.AfterFunc(10*time.Millisecond, func() {
plan.Stop()
})
err := plan.Run("127.0.0.1:8500")
if err != nil {
t.Fatalf("err: %v", err)
}
if expect == 1 {
t.Fatalf("Bad: %d", expect)
}
}

129
watch/watch.go Normal file
View File

@ -0,0 +1,129 @@
package watch
import (
"fmt"
"io"
"sync"
"github.com/armon/consul-api"
)
// WatchPlan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type WatchPlan struct {
Datacenter string
Token string
Type string
Exempt map[string]interface{}
Func WatchFunc
Handler HandlerFunc
LogOutput io.Writer
address string
client *consulapi.Client
lastIndex uint64
lastResult interface{}
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// WatchFunc is used to watch for a diff
type WatchFunc func(*WatchPlan) (uint64, interface{}, error)
// HandlerFunc is used to handle new data
type HandlerFunc func(uint64, interface{})
// Parse takes a watch query and compiles it into a WatchPlan or an error
func Parse(params map[string]interface{}) (*WatchPlan, error) {
return ParseExempt(params, nil)
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) {
plan := &WatchPlan{
stopCh: make(chan struct{}),
}
// Parse the generic parameters
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
}
if err := assignValue(params, "token", &plan.Token); err != nil {
return nil, err
}
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
// Ensure there is a watch type
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
}
// Get the watch func
fn, err := factory(params)
if err != nil {
return nil, err
}
plan.Func = fn
// Remove the exempt parameters
if len(exempt) > 0 {
plan.Exempt = make(map[string]interface{})
for _, ex := range exempt {
val, ok := params[ex]
if ok {
plan.Exempt[ex] = val
delete(params, ex)
}
}
}
// Ensure all parameters are consumed
if len(params) != 0 {
var bad []string
for key := range params {
bad = append(bad, key)
}
return nil, fmt.Errorf("Invalid parameters: %v", bad)
}
return plan, nil
}
// assignValue is used to extract a value ensuring it is a string
func assignValue(params map[string]interface{}, name string, out *string) error {
if raw, ok := params[name]; ok {
val, ok := raw.(string)
if !ok {
return fmt.Errorf("Expecting %s to be a string")
}
*out = val
delete(params, name)
}
return nil
}
// assignValueBool is used to extract a value ensuring it is a bool
func assignValueBool(params map[string]interface{}, name string, out *bool) error {
if raw, ok := params[name]; ok {
val, ok := raw.(bool)
if !ok {
return fmt.Errorf("Expecting %s to be a boolean")
}
*out = val
delete(params, name)
}
return nil
}

48
watch/watch_test.go Normal file
View File

@ -0,0 +1,48 @@
package watch
import (
"bytes"
"encoding/json"
"testing"
)
func TestParseBasic(t *testing.T) {
params := makeParams(t, `{"type":"key", "datacenter":"dc2", "token":"12345", "key":"foo"}`)
p, err := Parse(params)
if err != nil {
t.Fatalf("err: %v", err)
}
if p.Datacenter != "dc2" {
t.Fatalf("Bad: %#v", p)
}
if p.Token != "12345" {
t.Fatalf("Bad: %#v", p)
}
if p.Type != "key" {
t.Fatalf("Bad: %#v", p)
}
}
func TestParse_exempt(t *testing.T) {
params := makeParams(t, `{"type":"key", "key":"foo", "handler": "foobar"}`)
p, err := ParseExempt(params, []string{"handler"})
if err != nil {
t.Fatalf("err: %v", err)
}
if p.Type != "key" {
t.Fatalf("Bad: %#v", p)
}
ex := p.Exempt["handler"]
if ex != "foobar" {
t.Fatalf("bad: %v", ex)
}
}
func makeParams(t *testing.T, s string) map[string]interface{} {
var out map[string]interface{}
dec := json.NewDecoder(bytes.NewReader([]byte(s)))
if err := dec.Decode(&out); err != nil {
t.Fatalf("err: %v", err)
}
return out
}

View File

@ -145,7 +145,13 @@ definitions support being updated during a reload.
"data_dir": "/opt/consul",
"log_level": "INFO",
"node_name": "foobar",
"server": true
"server": true,
"watches": [
{
"type": "checks",
"handler": "/usr/bin/health-check-handler.sh"
}
]
}
</pre>
@ -316,6 +322,11 @@ definitions support being updated during a reload.
However, because the caches are not actively invalidated, ACL policy may be stale
up to the TTL value.
* `watches` - Watches is a list of watch specifications.
These allow an external process to be automatically invoked when a particular
data view is updated. See the [watch documentation](/docs/agent/watches.html) for
more documentation. Watches can be modified when the configuration is reloaded.
## Ports Used
Consul requires up to 5 different ports to work properly, some requiring

View File

@ -0,0 +1,286 @@
---
layout: "docs"
page_title: "Watches"
sidebar_current: "docs-agent-watches"
---
# Watches
Watches are a way of specifying a view of data (list of nodes, KV pairs,
health checks, etc) which is monitored for any updates. When an update
is detected, an external handler is invoked. A handler can be any
executable. As an example, you could watch the status of health checks and
notify an external system when a check is critical.
Watches are implemented using blocking queries in the [HTTP API](/docs/agent/http.html).
Agent's automatically make the proper API calls to watch for changes,
and inform a handler when the data view has updated.
Watches can can be configured as part of the [agent's configuration](/docs/agent/options.html),
causing them to run once the agent is initialized. Reloading the agent configuration
allows for adding or removing watches dynamically.
Alternatively, the [watch command](/docs/commands/watch.html) enables a watch to be
started outside of the agent. This can be used by an operator to inspect data in Consul,
or to easily pipe data into processes without being tied to the agent lifecycle.
In either case, the `type` of the watch must be specified. Each type of watch
supports different parameters, both required and optional. These options are specified
in a JSON body when using agent configuration, or as CLI flags for the watch command.
## Handlers
The watch specifiation specifies the view of data to be monitored.
Once that view is updated the specified handler is invoked. The handler
can be any executable.
A handler should read it's input from stdin, and expect to read
JSON formatted data. The format of the data depends on the type of the
watch. Each watch type documents the format type, and because they
map directly to an HTTP API, handlers should expect the input to
match the format of the API.
Additionally, the `CONSUL_INDEX` environmental variable will be set.
This maps to the `X-Consul-Index` value from the [HTTP API](/docs/agent/http.html).
## Global Parameters
In addition to the parameters supported by each option type, there
are a few global parameters that all watches support:
* `datacenter` - Can be provided to override the agent's default datacenter.
* `token` - Can be provided to override the agent's default ACL token.
* `handler` - The handler to invoke when the data view updates.
## Watch Types
The following types are supported, with more documentation below:
* `key` - Watch a specific KV pair
* `keyprefix` - Watch a prefix in the KV store
* `services` - Watch the list of available services
* `nodes` - Watch the list of nodes
* `service`- Watch the instances of a service
* `checks` - Watch the value of health checks
### Type: key
The "key" watch type is used to watch a specific key in the KV store.
It requires that the "key" parameter be specified.
This maps to the `/v1/kv/` API internally.
Here is an example configuration:
{
"type": "key",
"key": "foo/bar/baz",
"handler": "/usr/bin/my-key-handler.sh"
}
Or, using the watch command:
$ consul watch -type key -key foo/bar/baz /usr/bin/my-key-handler.sh
An example of the output of this command:
{
"Key": "foo/bar/baz",
"CreateIndex": 1793,
"ModifyIndex": 1793,
"LockIndex": 0,
"Flags": 0,
"Value": "aGV5",
"Session": ""
}
### Type: keyprefix
The "keyprefix" watch type is used to watch a prefix of keys in the KV store.
It requires that the "prefix" parameter be specified.
This maps to the `/v1/kv/` API internally.
Here is an example configuration:
{
"type": "keyprefix",
"prefix": "foo/",
"handler": "/usr/bin/my-prefix-handler.sh"
}
Or, using the watch command:
$ consul watch -type keyprefix -prefix foo/ /usr/bin/my-prefix-handler.sh
An example of the output of this command:
[
{
"Key": "foo/bar",
"CreateIndex": 1796,
"ModifyIndex": 1796,
"LockIndex": 0,
"Flags": 0,
"Value": "TU9BUg==",
"Session": ""
},
{
"Key": "foo/baz",
"CreateIndex": 1795,
"ModifyIndex": 1795,
"LockIndex": 0,
"Flags": 0,
"Value": "YXNkZg==",
"Session": ""
},
{
"Key": "foo/test",
"CreateIndex": 1793,
"ModifyIndex": 1793,
"LockIndex": 0,
"Flags": 0,
"Value": "aGV5",
"Session": ""
}
]
### Type: services
The "services" watch type is used to watch the list of available
services. It has no parameters.
This maps to the `/v1/catalog/services` API internally.
An example of the output of this command:
{
"consul": [],
"redis": [],
"web": []
}
### Type: nodes
The "nodes" watch type is used to watch the list of available
nodes. It has no parameters.
This maps to the `/v1/catalog/nodes` API internally.
An example of the output of this command:
[
{
"Node": "nyc1-consul-1",
"Address": "192.241.159.115"
},
{
"Node": "nyc1-consul-2",
"Address": "192.241.158.205"
},
{
"Node": "nyc1-consul-3",
"Address": "198.199.77.133"
},
{
"Node": "nyc1-worker-1",
"Address": "162.243.162.228"
},
{
"Node": "nyc1-worker-2",
"Address": "162.243.162.226"
},
{
"Node": "nyc1-worker-3",
"Address": "162.243.162.229"
}
]
### Type: service
The "service" watch type is used to monitor the providers
of a single service. It requires the "service" parameter,
but optionally takes "tag" and "passingonly". The "tag" parameter
will filter by tag, and "passingonly" is a boolean that will
filter to only the instances passing all health checks.
This maps to the `/v1/health/service` API internally.
Here is an example configuration:
{
"type": "service",
"key": "redis",
"handler": "/usr/bin/my-service-handler.sh"
}
Or, using the watch command:
$ consul watch -type service -service redis /usr/bin/my-service-handler.sh
An example of the output of this command:
[
{
"Node": {
"Node": "foobar",
"Address": "10.1.10.12"
},
"Service": {
"ID": "redis",
"Service": "redis",
"Tags": null,
"Port": 8000
},
"Checks": [
{
"Node": "foobar",
"CheckID": "service:redis",
"Name": "Service 'redis' check",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "redis",
"ServiceName": "redis"
},
{
"Node": "foobar",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "",
"ServiceName": ""
}
]
}
]
### Type: checks
The "checks" watch type is used to monitor the checks of a given
service or in a specific state. It optionally takes the "service"
parameter to filter to a specific service, or "state" to filter
to a specific state. By default, it will watch all checks.
This maps to the `/v1/health/state/` API if monitoring by state,
or `/v1/health/checks/` if monitoring by service.
An example of the output of this command:
[
{
"Node": "foobar",
"CheckID": "service:redis",
"Name": "Service 'redis' check",
"Status": "passing",
"Notes": "",
"Output": "",
"ServiceID": "redis",
"ServiceName": "redis"
}
]

View File

@ -34,6 +34,7 @@ Available commands are:
monitor Stream logs from a Consul agent
reload Triggers the agent to reload configuration files
version Prints the Consul version
watch Watch for changes in Consul
```
To get help for any specific command, pass the `-h` flag to the relevant

View File

@ -0,0 +1,53 @@
---
layout: "docs"
page_title: "Commands: Watch"
sidebar_current: "docs-commands-watch"
---
# Consul Watch
Command: `consul watch`
The watch command provides a mechanism to watch for changes in a particular
data view (list of nodes, service members, key value, etc) and to invoke
a process with the latest values of the view. If no process is specified,
the current values are dumped to stdout which can be a useful way to inspect
data in Consul.
There is more [documentation on watches here](/docs/agent/watches.html).
## Usage
Usage: `consul watch [options] [child...]`
The only required option is `-type` which specifies the particular
data view. Depending on the type, various options may be required
or optionally provided. There is more documentation on watch
[specifications here](/docs/agent/watches.html).
The list of available flags are:
* `-http-addr` - Address to the HTTP server of the agent you want to contact
to send this command. If this isn't specified, the command will contact
"127.0.0.1:8500" which is the default HTTP address of a Consul agent.
* `-datacenter` - Datacenter to query. Defaults to that of agent.
* `-token` - ACL token to use. Defaults to that of agent.
* `-key` - Key to watch. Only for `key` type.
* `-passingonly=[true|false]` - Should only passing entries be returned. Default false.
only for `service` type.
* `-prefix` - Key prefix to watch. Only for `keyprefix` type.
* `-service` - Service to watch. Required for `service` type, optional for `checks` type.
* `-state` - Check state to filter on. Optional for `checks` type.
* `-tag` - Service tag to filter on. Optional for `service` type.
* `-type` - Watch type. Required, one of "key", "keyprefix", "services",
"nodes", "services", or "checks".

View File

@ -57,6 +57,7 @@ Available commands are:
members Lists the members of a Consul cluster
monitor Stream logs from a Consul agent
version Prints the Consul version
watch Watch for changes in Consul
```
If you get an error that `consul` could not be found, then your PATH

View File

@ -89,6 +89,10 @@
<li<%= sidebar_current("docs-commands-reload") %>>
<a href="/docs/commands/reload.html">reload</a>
</li>
<li<%= sidebar_current("docs-commands-watch") %>>
<a href="/docs/commands/watch.html">watch</a>
</li>
</ul>
</li>
@ -130,6 +134,10 @@
<li<%= sidebar_current("docs-agent-telemetry") %>>
<a href="/docs/agent/telemetry.html">Telemetry</a>
</li>
<li<%= sidebar_current("docs-agent-watches") %>>
<a href="/docs/agent/watches.html">Watches</a>
</li>
</ul>