diff --git a/watch/funcs.go b/watch/funcs.go index 21bf593ae3..dfc12220a7 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -42,9 +42,7 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) pair, meta, err := kv.Get(key, &opts) if err != nil { return 0, nil, err @@ -73,9 +71,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) pairs, meta, err := kv.List(prefix, &opts) if err != nil { return 0, nil, err @@ -94,9 +90,7 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) services, meta, err := catalog.Services(&opts) if err != nil { return 0, nil, err @@ -115,9 +109,7 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) nodes, meta, err := catalog.Nodes(&opts) if err != nil { return 0, nil, err @@ -153,9 +145,7 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { return 0, nil, err @@ -188,9 +178,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error @@ -218,9 +206,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, false) events, meta, err := event.List(name, &opts) if err != nil { return 0, nil, err @@ -237,3 +223,10 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { } return fn, nil } + +func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + return opts +}