Refactor localBlockingQuery to use memdb.WatchSet. Much simpler and correct as a bonus!

This commit is contained in:
Paul Banks 2018-04-19 11:15:32 +01:00 committed by Mitchell Hashimoto
parent 8d09381b96
commit 1e72ad66f5
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A

View File

@ -9,6 +9,9 @@ import (
"strings"
"time"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
@ -27,7 +30,6 @@ import (
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
"github.com/mitchellh/go-testing-interface"
"github.com/mitchellh/hashstructure"
)
type Self struct {
@ -919,7 +921,7 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
hash := req.URL.Query().Get("hash")
return s.agentLocalBlockingQuery(resp, hash, &queryOpts,
func(updateCh chan struct{}) (string, interface{}, error) {
func(ws memdb.WatchSet) (string, interface{}, error) {
// Retrieve the proxy specified
proxy := s.agent.State.Proxy(id)
if proxy == nil {
@ -938,17 +940,8 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
return "", nil, nil
}
// Setup "watch" on the proxy being modified and respond on chan if it is.
go func() {
select {
case <-updateCh:
// blocking query timedout or was cancelled. Abort
return
case <-proxy.WatchCh:
// Proxy was updated or removed, report it
updateCh <- struct{}{}
}
}()
// Watch the proxy for changes
ws.Add(proxy.WatchCh)
hash, err := hashstructure.Hash(proxy.Proxy, nil)
if err != nil {
@ -970,7 +963,7 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
return nil, nil
}
type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, error)
type agentLocalBlockingFunc func(ws memdb.WatchSet) (string, interface{}, error)
// agentLocalBlockingQuery performs a blocking query in a generic way against
// local agent state that has no RPC or raft to back it. It uses `hash` paramter
@ -979,7 +972,10 @@ type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, e
func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash string,
queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) {
var timer *time.Timer
// If we are not blocking we can skip tracking and allocating - nil WatchSet
// is still valid to call Add on and will just be a no op.
var ws memdb.WatchSet
var timeout *time.Timer
if hash != "" {
// TODO(banks) at least define these defaults somewhere in a const. Would be
@ -993,31 +989,26 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri
}
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timer = time.NewTimer(wait)
timeout = time.NewTimer(wait)
ws = memdb.NewWatchSet()
}
ch := make(chan struct{})
for {
curHash, curResp, err := fn(ch)
curHash, curResp, err := fn(ws)
if err != nil {
return curResp, err
}
// Hash was passed and matches current one, wait for update or timeout.
if timer != nil && hash == curHash {
select {
case <-ch:
// Update happened, loop to fetch a new value
continue
case <-timer.C:
// Timeout, stop the watcher goroutine and return what we have
close(ch)
break
}
// Return immediately if there is no timeout, the hash is different or the
// Watch returns true (indicating timeout fired). Note that Watch on a nil
// WatchSet immediately returns false which would incorrectly cause this to
// loop and repeat again, however we rely on the invariant that ws == nil
// IFF timeout == nil in which case the Watch call is never invoked.
if timeout == nil || hash != curHash || ws.Watch(timeout.C) {
resp.Header().Set("X-Consul-ContentHash", curHash)
return curResp, err
}
resp.Header().Set("X-Consul-ContentHash", curHash)
return curResp, err
// Watch returned false indicating a change was detected, loop and repeat
// the callback to load the new value.
}
}