Fix socket file handle leaks from old blocking queries upon consul reload. This fixes issue #3018

This commit is contained in:
Preetha Appan 2017-06-26 15:52:03 -05:00
parent 4b51d00458
commit 9eaf56bfe3
4 changed files with 42 additions and 12 deletions

View File

@ -105,6 +105,10 @@ type QueryOptions struct {
// relayed back to the sender through N other random nodes. Must be // relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive). // a value from 0 to 5 (inclusive).
RelayFactor uint8 RelayFactor uint8
// Context (optional) is passed through to the underlying http request layer, can be used
// to set timeouts and deadlines as well as to cancel requests
Context context.Context
} }
// WriteOptions are used to parameterize a write // WriteOptions are used to parameterize a write
@ -457,6 +461,7 @@ type request struct {
body io.Reader body io.Reader
header http.Header header http.Header
obj interface{} obj interface{}
ctx context.Context
} }
// setQueryOptions is used to annotate the request with // setQueryOptions is used to annotate the request with
@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.RelayFactor != 0 { if q.RelayFactor != 0 {
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor))) r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
} }
r.ctx = q.Context
} }
// durToMsec converts a duration to a millisecond specified string. If the // durToMsec converts a duration to a millisecond specified string. If the
@ -569,9 +575,12 @@ func (r *request) toHTTP() (*http.Request, error) {
if r.config.HttpAuth != nil { if r.config.HttpAuth != nil {
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
} }
if r.ctx != nil {
return req.WithContext(r.ctx), nil
} else {
return req, nil return req, nil
} }
}
// newRequest is used to create a new request // newRequest is used to create a new request
func (c *Client) newRequest(method, path string) *request { func (c *Client) newRequest(method, path string) *request {

View File

@ -1,6 +1,7 @@
package watch package watch
import ( import (
"context"
"fmt" "fmt"
consulapi "github.com/hashicorp/consul/api" consulapi "github.com/hashicorp/consul/api"
@ -41,7 +42,9 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
} }
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV() kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
pair, meta, err := kv.Get(key, &opts) pair, meta, err := kv.Get(key, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -70,7 +73,9 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
} }
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV() kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
pairs, meta, err := kv.List(prefix, &opts) pairs, meta, err := kv.List(prefix, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -89,7 +94,9 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
services, meta, err := catalog.Services(&opts) services, meta, err := catalog.Services(&opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -108,7 +115,9 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
nodes, meta, err := catalog.Nodes(&opts) nodes, meta, err := catalog.Nodes(&opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -144,7 +153,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health() health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
nodes, meta, err := health.Service(service, tag, passingOnly, &opts) nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -177,7 +188,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health() health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
var checks []*consulapi.HealthCheck var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta var meta *consulapi.QueryMeta
var err error var err error
@ -205,7 +218,9 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (uint64, interface{}, error) { fn := func(p *Plan) (uint64, interface{}, error) {
event := p.client.Event() event := p.client.Event()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx}
events, meta, err := event.List(name, &opts) events, meta, err := event.List(name, &opts)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err

View File

@ -107,6 +107,9 @@ func (p *Plan) Stop() {
return return
} }
p.stop = true p.stop = true
if p.cancelFunc != nil {
p.cancelFunc()
}
close(p.stopCh) close(p.stopCh)
} }

View File

@ -5,6 +5,8 @@ import (
"io" "io"
"sync" "sync"
"context"
consulapi "github.com/hashicorp/consul/api" consulapi "github.com/hashicorp/consul/api"
) )
@ -30,6 +32,7 @@ type Plan struct {
stop bool stop bool
stopCh chan struct{} stopCh chan struct{}
stopLock sync.Mutex stopLock sync.Mutex
cancelFunc context.CancelFunc
} }
// WatcherFunc is used to watch for a diff // WatcherFunc is used to watch for a diff