Made helper method for query options with context

This commit is contained in:
Preetha Appan 2017-06-26 16:11:14 -05:00
parent 9eaf56bfe3
commit a6c3cf0403

View File

@ -42,9 +42,7 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
} }
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV() kv := p.client.KV()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, stale)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
pair, meta, err := kv.Get(key, &opts) pair, meta, err := kv.Get(key, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -73,9 +71,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
} }
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV() kv := p.client.KV()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, stale)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
pairs, meta, err := kv.List(prefix, &opts) pairs, meta, err := kv.List(prefix, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -94,9 +90,7 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, stale)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
services, meta, err := catalog.Services(&opts) services, meta, err := catalog.Services(&opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -115,9 +109,7 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, stale)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
nodes, meta, err := catalog.Nodes(&opts) nodes, meta, err := catalog.Nodes(&opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -153,9 +145,7 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health() health := p.client.Health()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, stale)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
nodes, meta, err := health.Service(service, tag, passingOnly, &opts) nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -188,9 +178,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health() health := p.client.Health()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, stale)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
var checks []*consulapi.HealthCheck var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta var meta *consulapi.QueryMeta
var err error var err error
@ -218,9 +206,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
event := p.client.Event() event := p.client.Event()
ctx, cancel := context.WithCancel(context.Background()) opts := makeQueryOptionsWithContext(p, false)
p.cancelFunc = cancel
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx}
events, meta, err := event.List(name, &opts) events, meta, err := event.List(name, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -237,3 +223,10 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
} }
return fn, nil return fn, nil
} }
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
return opts
}