From f3c8873009dcb4312214c185aa891d4f66659428 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 15:50:32 -0700 Subject: [PATCH] watch: supporting service watch --- watch/funcs.go | 59 ++++++++++++++++++++++++++++++++++++++------- watch/funcs_test.go | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 9 deletions(-) diff --git a/watch/funcs.go b/watch/funcs.go index a63527af24..a3203b3d6e 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -2,6 +2,7 @@ package watch import ( "fmt" + "strconv" "github.com/armon/consul-api" ) @@ -19,19 +20,20 @@ func init() { "keyprefix": keyPrefixWatch, "services": servicesWatch, "nodes": nodesWatch, - "service": nil, + "service": serviceWatch, "checks": nil, } } // keyWatch is used to return a key watching function func keyWatch(params map[string][]string) (WatchFunc, error) { - keys := params["key"] - delete(params, "key") - if len(keys) != 1 { + var key string + if err := assignValue(params, "key", &key); err != nil { + return nil, err + } + if key == "" { return nil, fmt.Errorf("Must specify a single key to watch") } - key := keys[0] fn := func(p *WatchPlan) (uint64, interface{}, error) { kv := p.client.KV() @@ -50,12 +52,13 @@ func keyWatch(params map[string][]string) (WatchFunc, error) { // keyPrefixWatch is used to return a key prefix watching function func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { - list := params["prefix"] - delete(params, "prefix") - if len(list) != 1 { + var prefix string + if err := assignValue(params, "prefix", &prefix); err != nil { + return nil, err + } + if prefix == "" { return nil, fmt.Errorf("Must specify a single prefix to watch") } - prefix := list[0] fn := func(p *WatchPlan) (uint64, interface{}, error) { kv := p.client.KV() @@ -96,3 +99,41 @@ func nodesWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// serviceWatch is used to watch a specific service for changes +func serviceWatch(params map[string][]string) (WatchFunc, error) { + var service, tag, passingRaw string + if err := assignValue(params, "service", &service); err != nil { + return nil, err + } + if service == "" { + return nil, fmt.Errorf("Must specify a single service to watch") + } + + if err := assignValue(params, "tag", &tag); err != nil { + return nil, err + } + + if err := assignValue(params, "passingonly", &passingRaw); err != nil { + return nil, err + } + passingOnly := false + if passingRaw != "" { + b, err := strconv.ParseBool(passingRaw) + if err != nil { + return nil, fmt.Errorf("Failed to parse passingonly value: %v", err) + } + passingOnly = b + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + health := p.client.Health() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + nodes, meta, err := health.Service(service, tag, passingOnly, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, nodes, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 2b06d2a71a..8b13f27a30 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -215,3 +215,52 @@ func TestNodesWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestServiceWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:service service:foo tag:bar passingonly:true") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.ServiceEntry) + if ok && len(v) == 0 { + return + } + if !ok || v[0].Service.ID != "foo" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + agent := plan.client.Agent() + reg := &consulapi.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + Tags: []string{"bar"}, + } + agent.ServiceRegister(reg) + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + agent.ServiceDeregister("foo") + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +}