diff --git a/agent/watch_handler.go b/agent/watch_handler.go index 27c7a430e9..4c6a9d3f3f 100644 --- a/agent/watch_handler.go +++ b/agent/watch_handler.go @@ -42,13 +42,7 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun } logger := log.New(logOutput, "", log.LstdFlags) - fn := func(blockVal watch.BlockingParam, data interface{}) { - idx, ok := blockVal.(watch.WaitIndexVal) - if !ok { - logger.Printf("[ERR] agent: watch handler doesn't support non-index watches") - return - } - + fn := func(idx uint64, data interface{}) { // Create the command var cmd *osexec.Cmd var err error @@ -64,7 +58,7 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun } cmd.Env = append(os.Environ(), - "CONSUL_INDEX="+strconv.FormatUint(uint64(idx), 10), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), ) // Collect the output @@ -102,13 +96,7 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) watch.HandlerFunc { logger := log.New(logOutput, "", log.LstdFlags) - fn := func(blockVal watch.BlockingParam, data interface{}) { - idx, ok := blockVal.(watch.WaitIndexVal) - if !ok { - logger.Printf("[ERR] agent: watch handler doesn't support non-index watches") - return - } - + fn := func(idx uint64, data interface{}) { trans := cleanhttp.DefaultTransport() // Skip SSL certificate verification if TLSSkipVerify is true @@ -144,7 +132,7 @@ func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) } req = req.WithContext(ctx) req.Header.Add("Content-Type", "application/json") - req.Header.Add("X-Consul-Index", strconv.FormatUint(uint64(idx), 10)) + req.Header.Add("X-Consul-Index", strconv.FormatUint(idx, 10)) for key, values := range config.Header { for _, val := range values { req.Header.Add(key, val) diff --git a/agent/watch_handler_test.go b/agent/watch_handler_test.go index 6851baf714..f7ba83b0a6 100644 --- a/agent/watch_handler_test.go +++ b/agent/watch_handler_test.go @@ -17,7 +17,7 @@ func TestMakeWatchHandler(t *testing.T) { defer os.Remove("handler_index_out") script := "bash -c 'echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out'" handler := makeWatchHandler(os.Stderr, script) - handler(watch.WaitIndexVal(100), []string{"foo", "bar", "baz"}) + handler(100, []string{"foo", "bar", "baz"}) raw, err := ioutil.ReadFile("handler_out") if err != nil { t.Fatalf("err: %v", err) @@ -62,5 +62,5 @@ func TestMakeHTTPWatchHandler(t *testing.T) { Timeout: time.Minute, } handler := makeHTTPWatchHandler(os.Stderr, &config) - handler(watch.WaitIndexVal(100), []string{"foo", "bar", "baz"}) + handler(100, []string{"foo", "bar", "baz"}) } diff --git a/command/watch/watch.go b/command/watch/watch.go index 2286de1cca..3b8c67836b 100644 --- a/command/watch/watch.go +++ b/command/watch/watch.go @@ -154,7 +154,7 @@ func (c *cmd) Run(args []string) int { // 1: true errExit := 0 if len(c.flags.Args()) == 0 { - wp.Handler = func(blockParam consulwatch.BlockingParam, data interface{}) { + wp.Handler = func(idx uint64, data interface{}) { defer wp.Stop() buf, err := json.MarshalIndent(data, "", " ") if err != nil { @@ -164,14 +164,7 @@ func (c *cmd) Run(args []string) int { c.UI.Output(string(buf)) } } else { - wp.Handler = func(blockVal consulwatch.BlockingParam, data interface{}) { - idx, ok := blockVal.(consulwatch.WaitIndexVal) - if !ok { - // TODO(banks): make this work for hash based watches. - c.UI.Error("Error: watch handler doesn't support non-index watches") - return - } - + wp.Handler = func(idx uint64, data interface{}) { doneCh := make(chan struct{}) defer close(doneCh) logFn := func(err error) { @@ -192,7 +185,7 @@ func (c *cmd) Run(args []string) int { goto ERR } cmd.Env = append(os.Environ(), - "CONSUL_INDEX="+strconv.FormatUint(uint64(idx), 10), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), ) // Encode the input diff --git a/connect/proxy/config.go b/connect/proxy/config.go index 3bd4db38bf..b5a8c6bb48 100644 --- a/connect/proxy/config.go +++ b/connect/proxy/config.go @@ -256,7 +256,7 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string, return w, nil } -func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParam, +func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal, val interface{}) { log.Printf("DEBUG: got hash %s", blockVal.(watch.WaitHashVal)) diff --git a/connect/service.go b/connect/service.go index 4c88877455..a614f227fd 100644 --- a/connect/service.go +++ b/connect/service.go @@ -236,7 +236,7 @@ func (s *Service) Close() error { return nil } -func (s *Service) rootsWatchHandler(blockParam watch.BlockingParam, raw interface{}) { +func (s *Service) rootsWatchHandler(blockParam watch.BlockingParamVal, raw interface{}) { if raw == nil { return } @@ -269,7 +269,7 @@ func (s *Service) rootsWatchHandler(blockParam watch.BlockingParam, raw interfac s.clientTLSCfg.SetTLSConfig(newCfg) } -func (s *Service) leafWatchHandler(blockParam watch.BlockingParam, raw interface{}) { +func (s *Service) leafWatchHandler(blockParam watch.BlockingParamVal, raw interface{}) { if raw == nil { return // ignore } diff --git a/watch/funcs.go b/watch/funcs.go index 3ad7f4f685..5e72e40a66 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -43,7 +43,7 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { if key == "" { return nil, fmt.Errorf("Must specify a single key to watch") } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { kv := p.client.KV() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -73,7 +73,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { if prefix == "" { return nil, fmt.Errorf("Must specify a single prefix to watch") } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { kv := p.client.KV() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -93,7 +93,7 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -113,7 +113,7 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -150,7 +150,7 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -184,7 +184,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { state = "any" } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -213,7 +213,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { event := p.client.Event() opts := makeQueryOptionsWithContext(p, false) defer p.cancelFunc() @@ -239,7 +239,7 @@ func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) { // We don't support stale since roots are likely to be cached locally in the // agent anyway. - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { agent := p.client.Agent() opts := makeQueryOptionsWithContext(p, false) defer p.cancelFunc() @@ -265,7 +265,7 @@ func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { agent := p.client.Agent() opts := makeQueryOptionsWithContext(p, false) defer p.cancelFunc() @@ -291,7 +291,7 @@ func connectProxyConfigWatch(params map[string]interface{}) (WatcherFunc, error) return nil, err } - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { agent := p.client.Agent() opts := makeQueryOptionsWithContext(p, false) defer p.cancelFunc() diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 89c5a1e801..d5253de444 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -32,7 +32,7 @@ func TestKeyWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -86,7 +86,7 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -140,7 +140,7 @@ func TestKeyPrefixWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -193,7 +193,7 @@ func TestServicesWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"services"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -247,7 +247,7 @@ func TestNodesWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"nodes"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -298,7 +298,7 @@ func TestServiceWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -354,7 +354,7 @@ func TestChecksWatch_State(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"checks", "state":"warning"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -415,7 +415,7 @@ func TestChecksWatch_Service(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -481,7 +481,7 @@ func TestEventWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"event", "name": "foo"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return } @@ -536,7 +536,7 @@ func TestConnectRootsWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"connect_roots"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -626,7 +626,7 @@ func TestConnectLeafWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"connect_leaf", "service_id":"web"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore } @@ -699,7 +699,7 @@ func TestConnectProxyConfigWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"connect_proxy_config", "proxy_service_id":"web-proxy"}`) - plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + plan.HybridHandler = func(blockParamVal watch.BlockingParamVal, raw interface{}) { if raw == nil { return // ignore } diff --git a/watch/plan.go b/watch/plan.go index 6292c19a41..1e34e4eacf 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -110,8 +110,16 @@ OUTER: // Handle the updated result p.lastResult = result - if p.Handler != nil { - p.Handler(blockParamVal, result) + // If a hybrid handler exists use that + if p.HybridHandler != nil { + p.HybridHandler(blockParamVal, result) + } else if p.Handler != nil { + idx, ok := blockParamVal.(WaitIndexVal) + if !ok { + logger.Printf("[ERR] consul.watch: Handler only supports index-based " + + " watches but non index-based watch run. Skipping Handler.") + } + p.Handler(uint64(idx), result) } } return nil diff --git a/watch/plan_test.go b/watch/plan_test.go index 6099dc2943..0ac648508a 100644 --- a/watch/plan_test.go +++ b/watch/plan_test.go @@ -10,7 +10,7 @@ func init() { } func noopWatch(params map[string]interface{}) (WatcherFunc, error) { - fn := func(p *Plan) (BlockingParam, interface{}, error) { + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { idx := WaitIndexVal(0) if i, ok := p.lastParamVal.(WaitIndexVal); ok { idx = i @@ -35,10 +35,57 @@ func TestRun_Stop(t *testing.T) { var expect uint64 = 1 doneCh := make(chan struct{}) - plan.Handler = func(blockParamVal BlockingParam, val interface{}) { + plan.Handler = func(idx uint64, val interface{}) { + if idx != expect { + t.Fatalf("Bad: %d %d", expect, idx) + } + if val != expect { + t.Fatalf("Bad: %d %d", expect, val) + } + if expect == 1 { + close(doneCh) + } + expect++ + } + + errCh := make(chan error, 1) + go func() { + errCh <- plan.Run("127.0.0.1:8500") + }() + + select { + case <-doneCh: + plan.Stop() + + case <-time.After(1 * time.Second): + t.Fatalf("handler never ran") + } + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("err: %v", err) + } + + case <-time.After(1 * time.Second): + t.Fatalf("watcher didn't exit") + } + + if expect == 1 { + t.Fatalf("Bad: %d", expect) + } +} + +func TestRun_Stop_Hybrid(t *testing.T) { + t.Parallel() + plan := mustParse(t, `{"type":"noop"}`) + + var expect uint64 = 1 + doneCh := make(chan struct{}) + plan.HybridHandler = func(blockParamVal BlockingParamVal, val interface{}) { idxVal, ok := blockParamVal.(WaitIndexVal) if !ok { - t.Fatalf("Expected index-based watch") + t.Fatalf("expected index-based watch") } idx := uint64(idxVal) if idx != expect { diff --git a/watch/watch.go b/watch/watch.go index b520d702e8..1bc3d0ae6f 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -24,13 +24,16 @@ type Plan struct { HandlerType string Exempt map[string]interface{} - Watcher WatcherFunc - Handler HandlerFunc - LogOutput io.Writer + Watcher WatcherFunc + // Handler is kept for backward compatibility but only supports watches based + // on index param. To support hash based watches, set HybridHandler instead. + Handler HandlerFunc + HybridHandler HybridHandlerFunc + LogOutput io.Writer address string client *consulapi.Client - lastParamVal BlockingParam + lastParamVal BlockingParamVal lastResult interface{} stop bool @@ -48,39 +51,39 @@ type HttpHandlerConfig struct { TLSSkipVerify bool `mapstructure:"tls_skip_verify"` } -// BlockingParam is an interface representing the common operations needed for +// BlockingParamVal is an interface representing the common operations needed for // different styles of blocking. It's used to abstract the core watch plan from // whether we are performing index-based or hash-based blocking. -type BlockingParam interface { +type BlockingParamVal interface { // Equal returns whether the other param value should be considered equal // (i.e. representing no change in the watched resource). Equal must not panic // if other is nil. - Equal(other BlockingParam) bool + Equal(other BlockingParamVal) bool // Next is called when deciding which value to use on the next blocking call. - // It assumes the BlockingParam value it is called on is the most recent one + // It assumes the BlockingParamVal value it is called on is the most recent one // returned and passes the previous one which may be nil as context. This // allows types to customise logic around ordering without assuming there is // an order. For example WaitIndexVal can check that the index didn't go // backwards and if it did then reset to 0. Most other cases should just // return themselves (the most recent value) to be used in the next request. - Next(previous BlockingParam) BlockingParam + Next(previous BlockingParamVal) BlockingParamVal } // WaitIndexVal is a type representing a Consul index that implements -// BlockingParam. +// BlockingParamVal. type WaitIndexVal uint64 -// Equal implements BlockingParam -func (idx WaitIndexVal) Equal(other BlockingParam) bool { +// Equal implements BlockingParamVal +func (idx WaitIndexVal) Equal(other BlockingParamVal) bool { if otherIdx, ok := other.(WaitIndexVal); ok { return idx == otherIdx } return false } -// Next implements BlockingParam -func (idx WaitIndexVal) Next(previous BlockingParam) BlockingParam { +// Next implements BlockingParamVal +func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal { if previous == nil { return idx } @@ -93,27 +96,33 @@ func (idx WaitIndexVal) Next(previous BlockingParam) BlockingParam { } // WaitHashVal is a type representing a Consul content hash that implements -// BlockingParam. +// BlockingParamVal. type WaitHashVal string -// Equal implements BlockingParam -func (h WaitHashVal) Equal(other BlockingParam) bool { +// Equal implements BlockingParamVal +func (h WaitHashVal) Equal(other BlockingParamVal) bool { if otherHash, ok := other.(WaitHashVal); ok { return h == otherHash } return false } -// Next implements BlockingParam -func (h WaitHashVal) Next(previous BlockingParam) BlockingParam { +// Next implements BlockingParamVal +func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal { return h } // WatcherFunc is used to watch for a diff. -type WatcherFunc func(*Plan) (BlockingParam, interface{}, error) +type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error) -// HandlerFunc is used to handle new data -type HandlerFunc func(BlockingParam, interface{}) +// HandlerFunc is used to handle new data. It only works for index-based watches +// (which is almost all end points currently) and is kept for backwards +// compatibility until more places can make use of hash-based watches too. +type HandlerFunc func(uint64, interface{}) + +// HybridHandlerFunc is used to handle new data. It can support either +// index-based or hash-based watches via the BlockingParamVal. +type HybridHandlerFunc func(BlockingParamVal, interface{}) // Parse takes a watch query and compiles it into a WatchPlan or an error func Parse(params map[string]interface{}) (*Plan, error) {