diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 0ebec469b9..38d2b5f084 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -687,6 +687,135 @@ func TestKVS_Apply_LockDelay(t *testing.T) { } } +func TestKVS_Issue_1626(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Set up the first key. + { + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo/test", + Value: []byte("test"), + }, + } + var out bool + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Retrieve the base key and snag the index. + var index uint64 + { + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "foo/test", + } + var dirent structs.IndexedDirEntries + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + if dirent.Index == 0 { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if string(d.Value) != "test" { + t.Fatalf("bad: %v", d) + } + + index = dirent.Index + } + + // Set up a blocking query on the base key. + doneCh := make(chan *structs.IndexedDirEntries, 1) + go func() { + codec := rpcClient(t, s1) + defer codec.Close() + + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "foo/test", + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: 3 * time.Second, + }, + } + var dirent structs.IndexedDirEntries + if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + doneCh <- &dirent + }() + + // Now update a second key with a prefix that has the first key name + // as part of it. + { + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo/test2", + Value: []byte("test"), + }, + } + var out bool + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Make sure the blocking query didn't wake up for this update. + select { + case <-doneCh: + t.Fatalf("Blocking query should not have completed") + case <-time.After(1 * time.Second): + } + + // Now update the first key's payload. + { + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo/test", + Value: []byte("updated"), + }, + } + var out bool + if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Make sure the blocking query wakes up for the final update. + select { + case dirent := <-doneCh: + if dirent.Index <= index { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if string(d.Value) != "updated" { + t.Fatalf("bad: %v", d) + } + case <-time.After(1 * time.Second): + t.Fatalf("Blocking query should have completed") + } +} + var testListRules = ` key "" { policy = "deny" diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 76a8b5f3b7..412f36c2dd 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -45,7 +45,7 @@ type StateStore struct { tableWatches map[string]*FullTableWatch // kvsWatch holds the special prefix watch for the key value store. - kvsWatch *PrefixWatch + kvsWatch *PrefixWatchManager // kvsGraveyard manages tombstones for the key value store. kvsGraveyard *Graveyard @@ -110,7 +110,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) { schema: schema, db: db, tableWatches: tableWatches, - kvsWatch: NewPrefixWatch(), + kvsWatch: NewPrefixWatchManager(), kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), } @@ -448,7 +448,7 @@ func (s *StateStore) GetQueryWatch(method string) Watch { // GetKVSWatch returns a watch for the given prefix in the key value store. func (s *StateStore) GetKVSWatch(prefix string) Watch { - return s.kvsWatch.GetSubwatch(prefix) + return s.kvsWatch.NewPrefixWatch(prefix) } // EnsureRegistration is used to make sure a node, service, and check diff --git a/consul/state/watch.go b/consul/state/watch.go index f8aa273a82..93a3329b07 100644 --- a/consul/state/watch.go +++ b/consul/state/watch.go @@ -80,9 +80,29 @@ func (d *DumbWatchManager) Notify() { } } -// PrefixWatch maintains a notify group for each prefix, allowing for much more -// fine-grained watches. +// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager, +// bound to a specific prefix. type PrefixWatch struct { + // manager is the underlying watch manager. + manager *PrefixWatchManager + + // prefix is the prefix we are watching. + prefix string +} + +// Wait registers the given channel with the notify group for our prefix. +func (w *PrefixWatch) Wait(notifyCh chan struct{}) { + w.manager.Wait(w.prefix, notifyCh) +} + +// Clear deregisters the given channel from the the notify group for our prefix. +func (w *PrefixWatch) Clear(notifyCh chan struct{}) { + w.manager.Clear(w.prefix, notifyCh) +} + +// PrefixWatchManager maintains a notify group for each prefix, allowing for +// much more fine-grained watches. +type PrefixWatchManager struct { // watches has the set of notify groups, organized by prefix. watches *radix.Tree @@ -90,37 +110,59 @@ type PrefixWatch struct { lock sync.Mutex } -// NewPrefixWatch returns a new prefix watch. -func NewPrefixWatch() *PrefixWatch { - return &PrefixWatch{ +// NewPrefixWatchManager returns a new prefix watch manager. +func NewPrefixWatchManager() *PrefixWatchManager { + return &PrefixWatchManager{ watches: radix.New(), } } -// GetSubwatch returns the notify group for the given prefix. -func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup { +// NewPrefixWatch returns a Watch-compatible interface for watching the given +// prefix. +func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch { + return &PrefixWatch{ + manager: w, + prefix: prefix, + } +} + +// Wait registers the given channel on a prefix. +func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) { + w.lock.Lock() + defer w.lock.Unlock() + + var group *NotifyGroup + if raw, ok := w.watches.Get(prefix); ok { + group = raw.(*NotifyGroup) + } else { + group = &NotifyGroup{} + w.watches.Insert(prefix, group) + } + group.Wait(notifyCh) +} + +// Clear deregisters the given channel from the notify group for a prefix (if +// one exists). +func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) { w.lock.Lock() defer w.lock.Unlock() if raw, ok := w.watches.Get(prefix); ok { - return raw.(*NotifyGroup) + group := raw.(*NotifyGroup) + group.Clear(notifyCh) } - - group := &NotifyGroup{} - w.watches.Insert(prefix, group) - return group } // Notify wakes up all the watchers associated with the given prefix. If subtree // is true then we will also notify all the tree under the prefix, such as when // a key is being deleted. -func (w *PrefixWatch) Notify(prefix string, subtree bool) { +func (w *PrefixWatchManager) Notify(prefix string, subtree bool) { w.lock.Lock() defer w.lock.Unlock() var cleanup []string - fn := func(k string, v interface{}) bool { - group := v.(*NotifyGroup) + fn := func(k string, raw interface{}) bool { + group := raw.(*NotifyGroup) group.Notify() if k != "" { cleanup = append(cleanup, k) diff --git a/consul/state/watch_test.go b/consul/state/watch_test.go index 64f08df06e..6eaf85d678 100644 --- a/consul/state/watch_test.go +++ b/consul/state/watch_test.go @@ -1,6 +1,8 @@ package state import ( + "sort" + "strings" "testing" ) @@ -163,14 +165,106 @@ func TestWatch_DumbWatchManager(t *testing.T) { }() } +func verifyWatches(t *testing.T, w *PrefixWatchManager, expected string) { + var found []string + fn := func(k string, v interface{}) bool { + if k == "" { + k = "(full)" + } + found = append(found, k) + return false + } + w.watches.WalkPrefix("", fn) + + sort.Strings(found) + actual := strings.Join(found, "|") + if expected != actual { + t.Fatalf("bad: %s != %s", expected, actual) + } +} + +func TestWatch_PrefixWatchManager(t *testing.T) { + w := NewPrefixWatchManager() + verifyWatches(t, w, "") + + // This will create the watch group. + ch1 := make(chan struct{}, 1) + w.Wait("hello", ch1) + verifyWatches(t, w, "hello") + + // This will add to the existing one. + ch2 := make(chan struct{}, 1) + w.Wait("hello", ch2) + verifyWatches(t, w, "hello") + + // This will add to the existing as well. + ch3 := make(chan struct{}, 1) + w.Wait("hello", ch3) + verifyWatches(t, w, "hello") + + // Remove one of the watches. + w.Clear("hello", ch2) + verifyWatches(t, w, "hello") + + // Do "clear" for one that was never added. + ch4 := make(chan struct{}, 1) + w.Clear("hello", ch4) + verifyWatches(t, w, "hello") + + // Add a full table watch. + full := make(chan struct{}, 1) + w.Wait("", full) + verifyWatches(t, w, "(full)|hello") + + // Add another channel for a different prefix. + nope := make(chan struct{}, 1) + w.Wait("nope", nope) + verifyWatches(t, w, "(full)|hello|nope") + + // Fire off the notification and make sure channels were pinged (or not) + // as expected. + w.Notify("hello", false) + verifyWatches(t, w, "(full)|nope") + select { + case <-ch1: + default: + t.Fatalf("ch1 should have been notified") + } + select { + case <-ch2: + t.Fatalf("ch2 should not have been notified") + default: + } + select { + case <-ch3: + default: + t.Fatalf("ch3 should have been notified") + } + select { + case <-ch4: + t.Fatalf("ch4 should not have been notified") + default: + } + select { + case <-nope: + t.Fatalf("nope should not have been notified") + default: + } + select { + case <-full: + default: + t.Fatalf("full should have been notified") + } +} + func TestWatch_PrefixWatch(t *testing.T) { - w := NewPrefixWatch() + w := NewPrefixWatchManager() // Hit a specific key. - verifyWatch(t, w.GetSubwatch(""), func() { - verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() { - verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() { - verifyNoWatch(t, w.GetSubwatch("nope"), func() { + verifyWatch(t, w.NewPrefixWatch(""), func() { + verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() { + verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() { + verifyNoWatch(t, w.NewPrefixWatch("nope"), func() { w.Notify("foo/bar/baz", false) }) }) @@ -179,35 +273,39 @@ func TestWatch_PrefixWatch(t *testing.T) { // Make sure cleanup is happening. All that should be left is the // full-table watch and the un-fired watches. - fn := func(k string, v interface{}) bool { - if k != "" && k != "foo/bar/zoo" && k != "nope" { - t.Fatalf("unexpected watch: %s", k) - } - return false - } - w.watches.WalkPrefix("", fn) + verifyWatches(t, w, "(full)|foo/bar/zoo|nope") // Delete a subtree. - verifyWatch(t, w.GetSubwatch(""), func() { - verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() { - verifyWatch(t, w.GetSubwatch("foo/bar/zoo"), func() { - verifyNoWatch(t, w.GetSubwatch("nope"), func() { + verifyWatch(t, w.NewPrefixWatch(""), func() { + verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() { + verifyWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() { + verifyNoWatch(t, w.NewPrefixWatch("nope"), func() { w.Notify("foo/", true) }) }) }) }) + verifyWatches(t, w, "(full)|nope") // Hit an unknown key. - verifyWatch(t, w.GetSubwatch(""), func() { - verifyNoWatch(t, w.GetSubwatch("foo/bar/baz"), func() { - verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() { - verifyNoWatch(t, w.GetSubwatch("nope"), func() { + verifyWatch(t, w.NewPrefixWatch(""), func() { + verifyNoWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() { + verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() { + verifyNoWatch(t, w.NewPrefixWatch("nope"), func() { w.Notify("not/in/there", false) }) }) }) }) + verifyWatches(t, w, "(full)|foo/bar/baz|foo/bar/zoo|nope") + + // Make sure a watch can be reused. + watch := w.NewPrefixWatch("over/and/over") + for i := 0; i < 10; i++ { + verifyWatch(t, watch, func() { + w.Notify("over/and/over", false) + }) + } } type MockWatch struct {