diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 798c370b2e..d500b17bae 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -942,7 +942,7 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http. // // Returns the local proxy config for the identified proxy. Requires token= // param with the correct local ProxyToken (not ACL token). -func (s *HTTPServer) ConnectProxyConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Get the proxy ID. Note that this is the ID of a proxy's service instance. id := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/proxy/") diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index d6b1996ddc..d5ea7305aa 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -2292,7 +2292,7 @@ func TestAgentConnectProxy(t *testing.T) { ProxyServiceID: "test-proxy", TargetServiceID: "test", TargetServiceName: "test", - ContentHash: "a15dccb216d38a6e", + ContentHash: "84346af2031659c9", ExecMode: "daemon", Command: "", Config: map[string]interface{}{ @@ -2310,7 +2310,7 @@ func TestAgentConnectProxy(t *testing.T) { ur, err := copystructure.Copy(expectedResponse) require.NoError(t, err) updatedResponse := ur.(*api.ConnectProxyConfig) - updatedResponse.ContentHash = "22bc9233a52c08fd" + updatedResponse.ContentHash = "7d53473b0e9db5a" upstreams := updatedResponse.Config["upstreams"].([]interface{}) upstreams = append(upstreams, map[string]interface{}{ @@ -2337,7 +2337,7 @@ func TestAgentConnectProxy(t *testing.T) { }, { name: "blocking fetch timeout, no change", - url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e&wait=100ms", + url: "/v1/agent/connect/proxy/test-proxy?hash=" + expectedResponse.ContentHash + "&wait=100ms", wantWait: 100 * time.Millisecond, wantCode: 200, wantErr: false, @@ -2352,7 +2352,7 @@ func TestAgentConnectProxy(t *testing.T) { }, { name: "blocking fetch returns change", - url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e", + url: "/v1/agent/connect/proxy/test-proxy?hash=" + expectedResponse.ContentHash, updateFunc: func() { time.Sleep(100 * time.Millisecond) // Re-register with new proxy config @@ -2393,7 +2393,7 @@ func TestAgentConnectProxy(t *testing.T) { go tt.updateFunc() } start := time.Now() - obj, err := a.srv.ConnectProxyConfig(resp, req) + obj, err := a.srv.AgentConnectProxyConfig(resp, req) elapsed := time.Now().Sub(start) if tt.wantErr { diff --git a/agent/http_oss.go b/agent/http_oss.go index 124a268755..d9b8068ef2 100644 --- a/agent/http_oss.go +++ b/agent/http_oss.go @@ -32,6 +32,7 @@ func init() { registerEndpoint("/v1/agent/connect/authorize", []string{"POST"}, (*HTTPServer).AgentConnectAuthorize) registerEndpoint("/v1/agent/connect/ca/roots", []string{"GET"}, (*HTTPServer).AgentConnectCARoots) registerEndpoint("/v1/agent/connect/ca/leaf/", []string{"GET"}, (*HTTPServer).AgentConnectCALeafCert) + registerEndpoint("/v1/agent/connect/proxy/", []string{"GET"}, (*HTTPServer).AgentConnectProxyConfig) registerEndpoint("/v1/agent/service/register", []string{"PUT"}, (*HTTPServer).AgentRegisterService) registerEndpoint("/v1/agent/service/deregister/", []string{"PUT"}, (*HTTPServer).AgentDeregisterService) registerEndpoint("/v1/agent/service/maintenance/", []string{"PUT"}, (*HTTPServer).AgentServiceMaintenance) diff --git a/agent/structs/connect.go b/agent/structs/connect.go index 5f907c1ab0..20970c1bf8 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -66,8 +66,11 @@ type ConnectManagedProxy struct { // ProxyService is a pointer to the local proxy's service record for // convenience. The proxies ID and name etc. can be read from there. It may be - // nil if the agent is starting up and hasn't registered the service yet. - ProxyService *NodeService + // nil if the agent is starting up and hasn't registered the service yet. We + // ignore it when calculating the hash value since the only thing that effects + // the proxy's config is the ID of the target service which is already + // represented below. + ProxyService *NodeService `hash:"ignore"` // TargetServiceID is the ID of the target service on the localhost. It may // not exist yet since bootstrapping is allowed to happen in either order. diff --git a/agent/watch_handler.go b/agent/watch_handler.go index 4c6a9d3f3f..27c7a430e9 100644 --- a/agent/watch_handler.go +++ b/agent/watch_handler.go @@ -42,7 +42,13 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun } logger := log.New(logOutput, "", log.LstdFlags) - fn := func(idx uint64, data interface{}) { + fn := func(blockVal watch.BlockingParam, data interface{}) { + idx, ok := blockVal.(watch.WaitIndexVal) + if !ok { + logger.Printf("[ERR] agent: watch handler doesn't support non-index watches") + return + } + // Create the command var cmd *osexec.Cmd var err error @@ -58,7 +64,7 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun } cmd.Env = append(os.Environ(), - "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + "CONSUL_INDEX="+strconv.FormatUint(uint64(idx), 10), ) // Collect the output @@ -96,7 +102,13 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) watch.HandlerFunc { logger := log.New(logOutput, "", log.LstdFlags) - fn := func(idx uint64, data interface{}) { + fn := func(blockVal watch.BlockingParam, data interface{}) { + idx, ok := blockVal.(watch.WaitIndexVal) + if !ok { + logger.Printf("[ERR] agent: watch handler doesn't support non-index watches") + return + } + trans := cleanhttp.DefaultTransport() // Skip SSL certificate verification if TLSSkipVerify is true @@ -132,7 +144,7 @@ func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) } req = req.WithContext(ctx) req.Header.Add("Content-Type", "application/json") - req.Header.Add("X-Consul-Index", strconv.FormatUint(idx, 10)) + req.Header.Add("X-Consul-Index", strconv.FormatUint(uint64(idx), 10)) for key, values := range config.Header { for _, val := range values { req.Header.Add(key, val) diff --git a/agent/watch_handler_test.go b/agent/watch_handler_test.go index f7ba83b0a6..6851baf714 100644 --- a/agent/watch_handler_test.go +++ b/agent/watch_handler_test.go @@ -17,7 +17,7 @@ func TestMakeWatchHandler(t *testing.T) { defer os.Remove("handler_index_out") script := "bash -c 'echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out'" handler := makeWatchHandler(os.Stderr, script) - handler(100, []string{"foo", "bar", "baz"}) + handler(watch.WaitIndexVal(100), []string{"foo", "bar", "baz"}) raw, err := ioutil.ReadFile("handler_out") if err != nil { t.Fatalf("err: %v", err) @@ -62,5 +62,5 @@ func TestMakeHTTPWatchHandler(t *testing.T) { Timeout: time.Minute, } handler := makeHTTPWatchHandler(os.Stderr, &config) - handler(100, []string{"foo", "bar", "baz"}) + handler(watch.WaitIndexVal(100), []string{"foo", "bar", "baz"}) } diff --git a/api/agent_test.go b/api/agent_test.go index 01d35ae159..8cc58e012c 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1087,20 +1087,31 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) { Name: "foo", Tags: []string{"bar", "baz"}, Port: 8000, - Check: &AgentServiceCheck{ - CheckID: "foo-ttl", - TTL: "15s", + Connect: &AgentServiceConnect{ + Proxy: &AgentServiceConnectProxy{ + Config: map[string]interface{}{ + "foo": "bar", + }, + }, }, } if err := agent.ServiceRegister(reg); err != nil { t.Fatalf("err: %v", err) } - checks, err := agent.Checks() - if err != nil { - t.Fatalf("err: %v", err) - } - if _, ok := checks["foo-ttl"]; !ok { - t.Fatalf("missing check: %v", checks) + config, qm, err := agent.ConnectProxyConfig("foo-proxy", nil) + require.NoError(t, err) + expectConfig := &ConnectProxyConfig{ + ProxyServiceID: "foo-proxy", + TargetServiceID: "foo", + TargetServiceName: "foo", + ContentHash: "e662ea8600d84cf0", + ExecMode: "daemon", + Command: "", + Config: map[string]interface{}{ + "foo": "bar", + }, } + require.Equal(t, expectConfig, config) + require.Equal(t, "e662ea8600d84cf0", qm.LastContentHash) } diff --git a/api/api.go b/api/api.go index 6f3034d90a..6d64366377 100644 --- a/api/api.go +++ b/api/api.go @@ -175,6 +175,11 @@ type QueryMeta struct { // a blocking query LastIndex uint64 + // LastContentHash. This can be used as a WaitHash to perform a blocking query + // for endpoints that support hash-based blocking. Endpoints that do not + // support it will return an empty hash. + LastContentHash string + // Time of last contact from the leader for the // server servicing the request LastContact time.Duration @@ -733,12 +738,16 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (* func parseQueryMeta(resp *http.Response, q *QueryMeta) error { header := resp.Header - // Parse the X-Consul-Index - index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64) - if err != nil { - return fmt.Errorf("Failed to parse X-Consul-Index: %v", err) + // Parse the X-Consul-Index (if it's set - hash based blocking queries don't + // set this) + if indexStr := header.Get("X-Consul-Index"); indexStr != "" { + index, err := strconv.ParseUint(indexStr, 10, 64) + if err != nil { + return fmt.Errorf("Failed to parse X-Consul-Index: %v", err) + } + q.LastIndex = index } - q.LastIndex = index + q.LastContentHash = header.Get("X-Consul-ContentHash") // Parse the X-Consul-LastContact last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64) diff --git a/command/watch/watch.go b/command/watch/watch.go index 3b8c67836b..2286de1cca 100644 --- a/command/watch/watch.go +++ b/command/watch/watch.go @@ -154,7 +154,7 @@ func (c *cmd) Run(args []string) int { // 1: true errExit := 0 if len(c.flags.Args()) == 0 { - wp.Handler = func(idx uint64, data interface{}) { + wp.Handler = func(blockParam consulwatch.BlockingParam, data interface{}) { defer wp.Stop() buf, err := json.MarshalIndent(data, "", " ") if err != nil { @@ -164,7 +164,14 @@ func (c *cmd) Run(args []string) int { c.UI.Output(string(buf)) } } else { - wp.Handler = func(idx uint64, data interface{}) { + wp.Handler = func(blockVal consulwatch.BlockingParam, data interface{}) { + idx, ok := blockVal.(consulwatch.WaitIndexVal) + if !ok { + // TODO(banks): make this work for hash based watches. + c.UI.Error("Error: watch handler doesn't support non-index watches") + return + } + doneCh := make(chan struct{}) defer close(doneCh) logFn := func(err error) { @@ -185,7 +192,7 @@ func (c *cmd) Run(args []string) int { goto ERR } cmd.Env = append(os.Environ(), - "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + "CONSUL_INDEX="+strconv.FormatUint(uint64(idx), 10), ) // Encode the input diff --git a/connect/service.go b/connect/service.go index f9d6591c24..4c88877455 100644 --- a/connect/service.go +++ b/connect/service.go @@ -3,6 +3,7 @@ package connect import ( "context" "crypto/tls" + "crypto/x509" "errors" "log" "net" @@ -11,6 +12,7 @@ import ( "time" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/watch" "golang.org/x/net/http2" ) @@ -52,6 +54,9 @@ type Service struct { // TODO(banks): write the proper implementation httpResolverFromAddr func(addr string) (Resolver, error) + rootsWatch *watch.Plan + leafWatch *watch.Plan + logger *log.Logger } @@ -73,7 +78,28 @@ func NewServiceWithLogger(serviceID string, client *api.Client, tlsCfg: newDynamicTLSConfig(defaultTLSConfig()), } - // TODO(banks) run the background certificate sync + // Set up root and leaf watches + p, err := watch.Parse(map[string]interface{}{ + "type": "connect_roots", + }) + if err != nil { + return nil, err + } + s.rootsWatch = p + s.rootsWatch.Handler = s.rootsWatchHandler + + p, err = watch.Parse(map[string]interface{}{ + "type": "connect_leaf", + }) + if err != nil { + return nil, err + } + s.leafWatch = p + s.leafWatch.Handler = s.leafWatchHandler + + //go s.rootsWatch.RunWithClientAndLogger(s.client, s.logger) + //go s.leafWatch.RunWithClientAndLogger(s.client, s.logger) + return s, nil } @@ -201,6 +227,75 @@ func (s *Service) HTTPClient() *http.Client { // Close stops the service and frees resources. func (s *Service) Close() error { - // TODO(banks): stop background activity if started + if s.rootsWatch != nil { + s.rootsWatch.Stop() + } + if s.leafWatch != nil { + s.leafWatch.Stop() + } return nil } + +func (s *Service) rootsWatchHandler(blockParam watch.BlockingParam, raw interface{}) { + if raw == nil { + return + } + v, ok := raw.(*api.CARootList) + if !ok || v == nil { + s.logger.Println("[ERR] got invalid response from root watch") + return + } + + // Got new root certificates, update the tls.Configs. + roots := x509.NewCertPool() + for _, root := range v.Roots { + roots.AppendCertsFromPEM([]byte(root.RootCertPEM)) + } + + // Note that SetTLSConfig takes care of adding a dynamic GetConfigForClient + // hook that will fetch this updated config for new incoming connections on a + // server. That means all future connections are validated against the new + // roots. On a client, we only expose Dial and we fetch the most recent config + // each time so all future Dials (direct or via an http.Client with our dial + // hook) will grab this new config. + newCfg := s.serverTLSCfg.TLSConfig() + // Server-side verification uses ClientCAs. + newCfg.ClientCAs = roots + s.serverTLSCfg.SetTLSConfig(newCfg) + + newCfg = s.clientTLSCfg.TLSConfig() + // Client-side verification uses RootCAs. + newCfg.RootCAs = roots + s.clientTLSCfg.SetTLSConfig(newCfg) +} + +func (s *Service) leafWatchHandler(blockParam watch.BlockingParam, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(*api.LeafCert) + if !ok || v == nil { + s.logger.Println("[ERR] got invalid response from root watch") + return + } + + // Got new leaf, update the tls.Configs + cert, err := tls.X509KeyPair([]byte(v.CertPEM), []byte(v.PrivateKeyPEM)) + if err != nil { + s.logger.Printf("[ERR] failed to parse new leaf cert: %s", err) + return + } + + // Note that SetTLSConfig takes care of adding a dynamic GetClientCertificate + // hook that will fetch the first cert from the Certificates slice of the + // current config for each outbound client request even if the client is using + // an old version of the config struct so all we need to do it set that and + // all existing clients will start using the new cert. + newCfg := s.serverTLSCfg.TLSConfig() + newCfg.Certificates = []tls.Certificate{cert} + s.serverTLSCfg.SetTLSConfig(newCfg) + + newCfg = s.clientTLSCfg.TLSConfig() + newCfg.Certificates = []tls.Certificate{cert} + s.clientTLSCfg.SetTLSConfig(newCfg) +} diff --git a/watch/funcs.go b/watch/funcs.go index 20265decc6..8c58236336 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -3,6 +3,7 @@ package watch import ( "context" "fmt" + "log" consulapi "github.com/hashicorp/consul/api" ) @@ -16,13 +17,16 @@ var watchFuncFactory map[string]watchFactory func init() { watchFuncFactory = map[string]watchFactory{ - "key": keyWatch, - "keyprefix": keyPrefixWatch, - "services": servicesWatch, - "nodes": nodesWatch, - "service": serviceWatch, - "checks": checksWatch, - "event": eventWatch, + "key": keyWatch, + "keyprefix": keyPrefixWatch, + "services": servicesWatch, + "nodes": nodesWatch, + "service": serviceWatch, + "checks": checksWatch, + "event": eventWatch, + "connect_roots": connectRootsWatch, + "connect_leaf": connectLeafWatch, + "connect_proxy_config": connectProxyConfigWatch, } } @@ -40,18 +44,18 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { if key == "" { return nil, fmt.Errorf("Must specify a single key to watch") } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { kv := p.client.KV() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() pair, meta, err := kv.Get(key, &opts) if err != nil { - return 0, nil, err + return nil, nil, err } if pair == nil { - return meta.LastIndex, nil, err + return WaitIndexVal(meta.LastIndex), nil, err } - return meta.LastIndex, pair, err + return WaitIndexVal(meta.LastIndex), pair, err } return fn, nil } @@ -70,15 +74,15 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { if prefix == "" { return nil, fmt.Errorf("Must specify a single prefix to watch") } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { kv := p.client.KV() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() pairs, meta, err := kv.List(prefix, &opts) if err != nil { - return 0, nil, err + return nil, nil, err } - return meta.LastIndex, pairs, err + return WaitIndexVal(meta.LastIndex), pairs, err } return fn, nil } @@ -90,15 +94,15 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() services, meta, err := catalog.Services(&opts) if err != nil { - return 0, nil, err + return nil, nil, err } - return meta.LastIndex, services, err + return WaitIndexVal(meta.LastIndex), services, err } return fn, nil } @@ -110,15 +114,15 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { - return 0, nil, err + return nil, nil, err } - return meta.LastIndex, nodes, err + return WaitIndexVal(meta.LastIndex), nodes, err } return fn, nil } @@ -147,15 +151,15 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { - return 0, nil, err + return nil, nil, err } - return meta.LastIndex, nodes, err + return WaitIndexVal(meta.LastIndex), nodes, err } return fn, nil } @@ -181,7 +185,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { state = "any" } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) defer p.cancelFunc() @@ -194,9 +198,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { checks, meta, err = health.Checks(service, &opts) } if err != nil { - return 0, nil, err + return nil, nil, err } - return meta.LastIndex, checks, err + return WaitIndexVal(meta.LastIndex), checks, err } return fn, nil } @@ -210,23 +214,98 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - fn := func(p *Plan) (uint64, interface{}, error) { + fn := func(p *Plan) (BlockingParam, interface{}, error) { event := p.client.Event() opts := makeQueryOptionsWithContext(p, false) defer p.cancelFunc() events, meta, err := event.List(name, &opts) if err != nil { - return 0, nil, err + return nil, nil, err } // Prune to only the new events for i := 0; i < len(events); i++ { - if event.IDToIndex(events[i].ID) == p.lastIndex { + if WaitIndexVal(event.IDToIndex(events[i].ID)).Equal(p.lastParamVal) { events = events[i+1:] break } } - return meta.LastIndex, events, err + return WaitIndexVal(meta.LastIndex), events, err + } + return fn, nil +} + +// connectRootsWatch is used to watch for changes to Connect Root certificates. +func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) { + // We don't support stale since roots are likely to be cached locally in the + // agent anyway. + + fn := func(p *Plan) (BlockingParam, interface{}, error) { + agent := p.client.Agent() + opts := makeQueryOptionsWithContext(p, false) + defer p.cancelFunc() + + roots, meta, err := agent.ConnectCARoots(&opts) + if err != nil { + return nil, nil, err + } + + return WaitIndexVal(meta.LastIndex), roots, err + } + return fn, nil +} + +// connectLeafWatch is used to watch for changes to Connect Leaf certificates +// for given local service id. +func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) { + // We don't support stale since certs are likely to be cached locally in the + // agent anyway. + + var serviceID string + if err := assignValue(params, "service_id", &serviceID); err != nil { + return nil, err + } + + fn := func(p *Plan) (BlockingParam, interface{}, error) { + agent := p.client.Agent() + opts := makeQueryOptionsWithContext(p, false) + defer p.cancelFunc() + + leaf, meta, err := agent.ConnectCALeaf(serviceID, &opts) + if err != nil { + return nil, nil, err + } + + return WaitIndexVal(meta.LastIndex), leaf, err + } + return fn, nil +} + +// connectProxyConfigWatch is used to watch for changes to Connect managed proxy +// configuration. Note that this state is agent-local so the watch mechanism +// uses `hash` rather than `index` for deciding whether to block. +func connectProxyConfigWatch(params map[string]interface{}) (WatcherFunc, error) { + // We don't support consistency modes since it's agent local data + + var proxyServiceID string + if err := assignValue(params, "proxy_service_id", &proxyServiceID); err != nil { + return nil, err + } + + fn := func(p *Plan) (BlockingParam, interface{}, error) { + agent := p.client.Agent() + opts := makeQueryOptionsWithContext(p, false) + defer p.cancelFunc() + + log.Printf("DEBUG: id: %s, opts: %v", proxyServiceID, opts) + + config, _, err := agent.ConnectProxyConfig(proxyServiceID, &opts) + if err != nil { + return nil, nil, err + } + + // Return string ContentHash since we don't have Raft indexes to block on. + return WaitHashVal(config.ContentHash), config, err } return fn, nil } @@ -234,6 +313,12 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { ctx, cancel := context.WithCancel(context.Background()) p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + opts := consulapi.QueryOptions{AllowStale: stale} + switch param := p.lastParamVal.(type) { + case WaitIndexVal: + opts.WaitIndex = uint64(param) + case WaitHashVal: + opts.WaitHash = string(param) + } return *opts.WithContext(ctx) } diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 190ae24faa..89c5a1e801 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -8,8 +8,10 @@ import ( "time" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/structs" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/watch" + "github.com/stretchr/testify/require" ) var errBadContent = errors.New("bad content") @@ -30,7 +32,7 @@ func TestKeyWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -84,7 +86,7 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -138,7 +140,7 @@ func TestKeyPrefixWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -191,7 +193,7 @@ func TestServicesWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"services"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -245,7 +247,7 @@ func TestNodesWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"nodes"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -296,7 +298,7 @@ func TestServiceWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -352,7 +354,7 @@ func TestChecksWatch_State(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"checks", "state":"warning"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -413,7 +415,7 @@ func TestChecksWatch_Service(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return // ignore } @@ -479,7 +481,7 @@ func TestEventWatch(t *testing.T) { invoke := makeInvokeCh() plan := mustParse(t, `{"type":"event", "name": "foo"}`) - plan.Handler = func(idx uint64, raw interface{}) { + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { if raw == nil { return } @@ -523,6 +525,220 @@ func TestEventWatch(t *testing.T) { wg.Wait() } +func TestConnectRootsWatch(t *testing.T) { + // TODO(banks) enable and make it work once this is supported. Note that this + // test actually passes currently just by busy-polling the roots endpoint + // until it changes. + t.Skip("CA and Leaf implementation don't actually support blocking yet") + t.Parallel() + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + + invoke := makeInvokeCh() + plan := mustParse(t, `{"type":"connect_roots"}`) + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(*consulapi.CARootList) + if !ok || v == nil { + return // ignore + } + // TODO(banks): verify the right roots came back. + invoke <- nil + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(20 * time.Millisecond) + // TODO(banks): this is a hack since CA config is in flux. We _did_ expose a + // temporary agent endpoint for PUTing config, but didn't expose it in `api` + // package intentionally. If we are going to hack around with temporary API, + // we can might as well drop right down to the RPC level... + args := structs.CAConfiguration{ + Provider: "static", + Config: map[string]interface{}{ + "Name": "test-1", + "Generate": true, + }, + } + var reply interface{} + if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { + t.Fatalf("err: %v", err) + } + + plan.Stop() + wg.Wait() +} + +func TestConnectLeafWatch(t *testing.T) { + // TODO(banks) enable and make it work once this is supported. + t.Skip("CA and Leaf implementation don't actually support blocking yet") + t.Parallel() + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + + // Register a web service to get certs for + { + agent := a.Client().Agent() + reg := consulapi.AgentServiceRegistration{ + ID: "web", + Name: "web", + Port: 9090, + } + err := agent.ServiceRegister(®) + require.Nil(t, err) + } + + // Setup a new generated CA + // + // TODO(banks): this is a hack since CA config is in flux. We _did_ expose a + // temporary agent endpoint for PUTing config, but didn't expose it in `api` + // package intentionally. If we are going to hack around with temporary API, + // we can might as well drop right down to the RPC level... + args := structs.CAConfiguration{ + Provider: "static", + Config: map[string]interface{}{ + "Name": "test-1", + "Generate": true, + }, + } + var reply interface{} + if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + invoke := makeInvokeCh() + plan := mustParse(t, `{"type":"connect_leaf", "service_id":"web"}`) + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(*consulapi.LeafCert) + if !ok || v == nil { + return // ignore + } + // TODO(banks): verify the right leaf came back. + invoke <- nil + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(20 * time.Millisecond) + + // Change the CA which should eventually trigger a leaf change but probably + // won't now so this test has no way to succeed yet. + args := structs.CAConfiguration{ + Provider: "static", + Config: map[string]interface{}{ + "Name": "test-2", + "Generate": true, + }, + } + var reply interface{} + if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { + t.Fatalf("err: %v", err) + } + + plan.Stop() + wg.Wait() +} + +func TestConnectProxyConfigWatch(t *testing.T) { + t.Parallel() + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + + // Register a local agent service with a managed proxy + reg := &consulapi.AgentServiceRegistration{ + Name: "web", + Port: 8080, + Connect: &consulapi.AgentServiceConnect{ + Proxy: &consulapi.AgentServiceConnectProxy{ + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + } + client := a.Client() + agent := client.Agent() + err := agent.ServiceRegister(reg) + require.NoError(t, err) + + invoke := makeInvokeCh() + plan := mustParse(t, `{"type":"connect_proxy_config", "proxy_service_id":"web-proxy"}`) + plan.Handler = func(blockParam watch.BlockingParam, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(*consulapi.ConnectProxyConfig) + if !ok || v == nil { + return // ignore + } + invoke <- nil + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(20 * time.Millisecond) + + // Change the proxy's config + reg.Connect.Proxy.Config["foo"] = "buzz" + reg.Connect.Proxy.Config["baz"] = "qux" + err := agent.ServiceRegister(reg) + require.NoError(t, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { + t.Fatalf("err: %v", err) + } + + plan.Stop() + wg.Wait() +} + func mustParse(t *testing.T, q string) *watch.Plan { var params map[string]interface{} if err := json.Unmarshal([]byte(q), ¶ms); err != nil { diff --git a/watch/plan.go b/watch/plan.go index fff9da7c7c..6292c19a41 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -37,7 +37,6 @@ func (p *Plan) RunWithConfig(address string, conf *consulapi.Config) error { if err != nil { return fmt.Errorf("Failed to connect to agent: %v", err) } - p.client = client // Create the logger output := p.LogOutput @@ -46,12 +45,24 @@ func (p *Plan) RunWithConfig(address string, conf *consulapi.Config) error { } logger := log.New(output, "", log.LstdFlags) + return p.RunWithClientAndLogger(client, logger) +} + +// RunWithClientAndLogger runs a watch plan using an external client and +// log.Logger instance. Using this, the plan's Datacenter, Token and LogOutput +// fields are ignored and the passed client is expected to be configured as +// needed. +func (p *Plan) RunWithClientAndLogger(client *consulapi.Client, + logger *log.Logger) error { + + p.client = client + // Loop until we are canceled failures := 0 OUTER: for !p.shouldStop() { // Invoke the handler - index, result, err := p.Watcher(p) + blockParamVal, result, err := p.Watcher(p) // Check if we should terminate since the function // could have blocked for a while @@ -63,7 +74,11 @@ OUTER: if err != nil { // Perform an exponential backoff failures++ - p.lastIndex = 0 + if blockParamVal == nil { + p.lastParamVal = nil + } else { + p.lastParamVal = blockParamVal.Next(p.lastParamVal) + } retry := retryInterval * time.Duration(failures*failures) if retry > maxBackoffTime { retry = maxBackoffTime @@ -82,24 +97,21 @@ OUTER: failures = 0 // If the index is unchanged do nothing - if index == p.lastIndex { + if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) { continue } // Update the index, look for change - oldIndex := p.lastIndex - p.lastIndex = index - if oldIndex != 0 && reflect.DeepEqual(p.lastResult, result) { + oldParamVal := p.lastParamVal + p.lastParamVal = blockParamVal.Next(oldParamVal) + if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { continue } - if p.lastIndex < oldIndex { - p.lastIndex = 0 - } // Handle the updated result p.lastResult = result if p.Handler != nil { - p.Handler(index, result) + p.Handler(blockParamVal, result) } } return nil diff --git a/watch/plan_test.go b/watch/plan_test.go index 16e4cfbc21..6099dc2943 100644 --- a/watch/plan_test.go +++ b/watch/plan_test.go @@ -10,9 +10,12 @@ func init() { } func noopWatch(params map[string]interface{}) (WatcherFunc, error) { - fn := func(p *Plan) (uint64, interface{}, error) { - idx := p.lastIndex + 1 - return idx, idx, nil + fn := func(p *Plan) (BlockingParam, interface{}, error) { + idx := WaitIndexVal(0) + if i, ok := p.lastParamVal.(WaitIndexVal); ok { + idx = i + } + return idx + 1, uint64(idx + 1), nil } return fn, nil } @@ -32,7 +35,12 @@ func TestRun_Stop(t *testing.T) { var expect uint64 = 1 doneCh := make(chan struct{}) - plan.Handler = func(idx uint64, val interface{}) { + plan.Handler = func(blockParamVal BlockingParam, val interface{}) { + idxVal, ok := blockParamVal.(WaitIndexVal) + if !ok { + t.Fatalf("Expected index-based watch") + } + idx := uint64(idxVal) if idx != expect { t.Fatalf("Bad: %d %d", expect, idx) } diff --git a/watch/watch.go b/watch/watch.go index cdf5342960..b520d702e8 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -28,10 +28,10 @@ type Plan struct { Handler HandlerFunc LogOutput io.Writer - address string - client *consulapi.Client - lastIndex uint64 - lastResult interface{} + address string + client *consulapi.Client + lastParamVal BlockingParam + lastResult interface{} stop bool stopCh chan struct{} @@ -48,11 +48,72 @@ type HttpHandlerConfig struct { TLSSkipVerify bool `mapstructure:"tls_skip_verify"` } -// WatcherFunc is used to watch for a diff -type WatcherFunc func(*Plan) (uint64, interface{}, error) +// BlockingParam is an interface representing the common operations needed for +// different styles of blocking. It's used to abstract the core watch plan from +// whether we are performing index-based or hash-based blocking. +type BlockingParam interface { + // Equal returns whether the other param value should be considered equal + // (i.e. representing no change in the watched resource). Equal must not panic + // if other is nil. + Equal(other BlockingParam) bool + + // Next is called when deciding which value to use on the next blocking call. + // It assumes the BlockingParam value it is called on is the most recent one + // returned and passes the previous one which may be nil as context. This + // allows types to customise logic around ordering without assuming there is + // an order. For example WaitIndexVal can check that the index didn't go + // backwards and if it did then reset to 0. Most other cases should just + // return themselves (the most recent value) to be used in the next request. + Next(previous BlockingParam) BlockingParam +} + +// WaitIndexVal is a type representing a Consul index that implements +// BlockingParam. +type WaitIndexVal uint64 + +// Equal implements BlockingParam +func (idx WaitIndexVal) Equal(other BlockingParam) bool { + if otherIdx, ok := other.(WaitIndexVal); ok { + return idx == otherIdx + } + return false +} + +// Next implements BlockingParam +func (idx WaitIndexVal) Next(previous BlockingParam) BlockingParam { + if previous == nil { + return idx + } + prevIdx, ok := previous.(WaitIndexVal) + if ok && prevIdx > idx { + // This value is smaller than the previous index, reset. + return WaitIndexVal(0) + } + return idx +} + +// WaitHashVal is a type representing a Consul content hash that implements +// BlockingParam. +type WaitHashVal string + +// Equal implements BlockingParam +func (h WaitHashVal) Equal(other BlockingParam) bool { + if otherHash, ok := other.(WaitHashVal); ok { + return h == otherHash + } + return false +} + +// Next implements BlockingParam +func (h WaitHashVal) Next(previous BlockingParam) BlockingParam { + return h +} + +// WatcherFunc is used to watch for a diff. +type WatcherFunc func(*Plan) (BlockingParam, interface{}, error) // HandlerFunc is used to handle new data -type HandlerFunc func(uint64, interface{}) +type HandlerFunc func(BlockingParam, interface{}) // Parse takes a watch query and compiles it into a WatchPlan or an error func Parse(params map[string]interface{}) (*Plan, error) {