diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index b3e3741a84..24685ee92a 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -9,6 +9,9 @@ import ( "strings" "time" + "github.com/hashicorp/go-memdb" + "github.com/mitchellh/hashstructure" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" @@ -27,7 +30,6 @@ import ( // NOTE(mitcehllh): This is temporary while certs are stubbed out. "github.com/mitchellh/go-testing-interface" - "github.com/mitchellh/hashstructure" ) type Self struct { @@ -919,7 +921,7 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http hash := req.URL.Query().Get("hash") return s.agentLocalBlockingQuery(resp, hash, &queryOpts, - func(updateCh chan struct{}) (string, interface{}, error) { + func(ws memdb.WatchSet) (string, interface{}, error) { // Retrieve the proxy specified proxy := s.agent.State.Proxy(id) if proxy == nil { @@ -938,17 +940,8 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http return "", nil, nil } - // Setup "watch" on the proxy being modified and respond on chan if it is. - go func() { - select { - case <-updateCh: - // blocking query timedout or was cancelled. Abort - return - case <-proxy.WatchCh: - // Proxy was updated or removed, report it - updateCh <- struct{}{} - } - }() + // Watch the proxy for changes + ws.Add(proxy.WatchCh) hash, err := hashstructure.Hash(proxy.Proxy, nil) if err != nil { @@ -970,7 +963,7 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http return nil, nil } -type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, error) +type agentLocalBlockingFunc func(ws memdb.WatchSet) (string, interface{}, error) // agentLocalBlockingQuery performs a blocking query in a generic way against // local agent state that has no RPC or raft to back it. It uses `hash` paramter @@ -979,7 +972,10 @@ type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, e func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash string, queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) { - var timer *time.Timer + // If we are not blocking we can skip tracking and allocating - nil WatchSet + // is still valid to call Add on and will just be a no op. + var ws memdb.WatchSet + var timeout *time.Timer if hash != "" { // TODO(banks) at least define these defaults somewhere in a const. Would be @@ -993,31 +989,26 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri } // Apply a small amount of jitter to the request. wait += lib.RandomStagger(wait / 16) - timer = time.NewTimer(wait) + timeout = time.NewTimer(wait) + ws = memdb.NewWatchSet() } - ch := make(chan struct{}) - for { - curHash, curResp, err := fn(ch) + curHash, curResp, err := fn(ws) if err != nil { return curResp, err } - // Hash was passed and matches current one, wait for update or timeout. - if timer != nil && hash == curHash { - select { - case <-ch: - // Update happened, loop to fetch a new value - continue - case <-timer.C: - // Timeout, stop the watcher goroutine and return what we have - close(ch) - break - } + // Return immediately if there is no timeout, the hash is different or the + // Watch returns true (indicating timeout fired). Note that Watch on a nil + // WatchSet immediately returns false which would incorrectly cause this to + // loop and repeat again, however we rely on the invariant that ws == nil + // IFF timeout == nil in which case the Watch call is never invoked. + if timeout == nil || hash != curHash || ws.Watch(timeout.C) { + resp.Header().Set("X-Consul-ContentHash", curHash) + return curResp, err } - - resp.Header().Set("X-Consul-ContentHash", curHash) - return curResp, err + // Watch returned false indicating a change was detected, loop and repeat + // the callback to load the new value. } }