mirror of https://github.com/status-im/consul.git
watch: Adding support for events
This commit is contained in:
parent
c3bb7de75d
commit
e13ba2f2da
|
@ -21,6 +21,7 @@ func init() {
|
|||
"nodes": nodesWatch,
|
||||
"service": serviceWatch,
|
||||
"checks": checksWatch,
|
||||
"event": eventWatch,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,3 +165,30 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) {
|
|||
}
|
||||
return fn, nil
|
||||
}
|
||||
|
||||
// eventWatch is used to watch for events, optionally filtering on name
|
||||
func eventWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||
var name string
|
||||
if err := assignValue(params, "name", &name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
||||
event := p.client.Event()
|
||||
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
|
||||
events, meta, err := event.List(name, &opts)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
// Prune to only the new events
|
||||
for i := 0; i < len(events); i++ {
|
||||
if event.IDToIndex(events[i].ID) == p.lastIndex {
|
||||
events = events[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
return meta.LastIndex, events, err
|
||||
}
|
||||
return fn, nil
|
||||
}
|
||||
|
|
|
@ -392,3 +392,43 @@ func TestChecksWatch_Service(t *testing.T) {
|
|||
t.Fatalf("bad: %v", invoke)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventWatch(t *testing.T) {
|
||||
if consulAddr == "" {
|
||||
t.Skip()
|
||||
}
|
||||
plan := mustParse(t, `{"type":"event", "name": "foo"}`)
|
||||
invoke := 0
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if invoke == 0 {
|
||||
if raw == nil {
|
||||
return
|
||||
}
|
||||
v, ok := raw.([]*consulapi.UserEvent)
|
||||
if !ok || len(v) == 0 || string(v[len(v)-1].Name) != "foo" {
|
||||
t.Fatalf("Bad: %#v", raw)
|
||||
}
|
||||
invoke++
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer plan.Stop()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
event := plan.client.Event()
|
||||
params := &consulapi.UserEvent{Name: "foo"}
|
||||
if _, _, err := event.Fire(params, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err := plan.Run(consulAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if invoke == 0 {
|
||||
t.Fatalf("bad: %v", invoke)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue