watch: Support for key prefix

This commit is contained in:
Armon Dadgar 2014-08-20 15:22:22 -07:00
parent 68a829119e
commit 970c606f1a
2 changed files with 145 additions and 1 deletions

View File

@ -15,7 +15,8 @@ var watchFuncFactory map[string]watchFactory
func init() {
watchFuncFactory = map[string]watchFactory{
"key": keyWatch,
"key": keyWatch,
"keyprefix": keyPrefixWatch,
}
}
@ -42,3 +43,24 @@ func keyWatch(params map[string][]string) (WatchFunc, error) {
}
return fn, nil
}
// keyPrefixWatch is used to return a key prefix watching function
func keyPrefixWatch(params map[string][]string) (WatchFunc, error) {
list := params["prefix"]
delete(params, "prefix")
if len(list) != 1 {
return nil, fmt.Errorf("Must specify a single prefix to watch")
}
prefix := list[0]
fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, pairs, err
}
return fn, nil
}

122
watch/funcs_test.go Normal file
View File

@ -0,0 +1,122 @@
package watch
import (
"os"
"testing"
"time"
"github.com/armon/consul-api"
)
var consulAddr string
func init() {
consulAddr = os.Getenv("CONSUL_ADDR")
}
func TestKeyWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, "type:key key:foo/bar/baz")
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.(*consulapi.KVPair)
if !ok || v == nil || string(v.Value) != "test" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
defer plan.Stop()
time.Sleep(20 * time.Millisecond)
kv := plan.client.KV()
pair := &consulapi.KVPair{
Key: "foo/bar/baz",
Value: []byte("test"),
}
_, err := kv.Put(pair, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the query to run
time.Sleep(20 * time.Millisecond)
plan.Stop()
// Delete the key
_, err = kv.Delete("foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}
func TestKeyPrefixWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, "type:keyprefix prefix:foo/")
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.(consulapi.KVPairs)
if !ok || v == nil || string(v[0].Key) != "foo/bar" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
defer plan.Stop()
time.Sleep(20 * time.Millisecond)
kv := plan.client.KV()
pair := &consulapi.KVPair{
Key: "foo/bar",
}
_, err := kv.Put(pair, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the query to run
time.Sleep(20 * time.Millisecond)
plan.Stop()
// Delete the key
_, err = kv.Delete("foo/bar", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}