diff --git a/watch/funcs.go b/watch/funcs.go index a6d1015964..bad10bec05 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -21,6 +21,7 @@ func init() { "nodes": nodesWatch, "service": serviceWatch, "checks": checksWatch, + "event": eventWatch, } } @@ -164,3 +165,30 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) { } return fn, nil } + +// eventWatch is used to watch for events, optionally filtering on name +func eventWatch(params map[string]interface{}) (WatchFunc, error) { + var name string + if err := assignValue(params, "name", &name); err != nil { + return nil, err + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + event := p.client.Event() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + events, meta, err := event.List(name, &opts) + if err != nil { + return 0, nil, err + } + + // Prune to only the new events + for i := 0; i < len(events); i++ { + if event.IDToIndex(events[i].ID) == p.lastIndex { + events = events[i+1:] + break + } + } + return meta.LastIndex, events, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index d0dbb0c8f2..fa0ff87741 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -392,3 +392,43 @@ func TestChecksWatch_Service(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestEventWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"event", "name": "foo"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.UserEvent) + if !ok || len(v) == 0 || string(v[len(v)-1].Name) != "foo" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + event := plan.client.Event() + params := &consulapi.UserEvent{Name: "foo"} + if _, _, err := event.Fire(params, nil); err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +}