Prevents watches from being orphaned when KVS blocking queries loop.

This commit is contained in:
James Phillips 2016-01-19 21:46:22 -08:00
parent b6f03d39cc
commit 01da5a2248
4 changed files with 307 additions and 38 deletions

View File

@ -687,6 +687,135 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
}
}
func TestKVS_Issue_1626(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Set up the first key.
{
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/test",
Value: []byte("test"),
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Retrieve the base key and snag the index.
var index uint64
{
getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "foo/test",
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "test" {
t.Fatalf("bad: %v", d)
}
index = dirent.Index
}
// Set up a blocking query on the base key.
doneCh := make(chan *structs.IndexedDirEntries, 1)
go func() {
codec := rpcClient(t, s1)
defer codec.Close()
getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "foo/test",
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: 3 * time.Second,
},
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
doneCh <- &dirent
}()
// Now update a second key with a prefix that has the first key name
// as part of it.
{
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/test2",
Value: []byte("test"),
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure the blocking query didn't wake up for this update.
select {
case <-doneCh:
t.Fatalf("Blocking query should not have completed")
case <-time.After(1 * time.Second):
}
// Now update the first key's payload.
{
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/test",
Value: []byte("updated"),
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure the blocking query wakes up for the final update.
select {
case dirent := <-doneCh:
if dirent.Index <= index {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "updated" {
t.Fatalf("bad: %v", d)
}
case <-time.After(1 * time.Second):
t.Fatalf("Blocking query should have completed")
}
}
var testListRules = `
key "" {
policy = "deny"

View File

@ -45,7 +45,7 @@ type StateStore struct {
tableWatches map[string]*FullTableWatch
// kvsWatch holds the special prefix watch for the key value store.
kvsWatch *PrefixWatch
kvsWatch *PrefixWatchManager
// kvsGraveyard manages tombstones for the key value store.
kvsGraveyard *Graveyard
@ -110,7 +110,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
schema: schema,
db: db,
tableWatches: tableWatches,
kvsWatch: NewPrefixWatch(),
kvsWatch: NewPrefixWatchManager(),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
}
@ -448,7 +448,7 @@ func (s *StateStore) GetQueryWatch(method string) Watch {
// GetKVSWatch returns a watch for the given prefix in the key value store.
func (s *StateStore) GetKVSWatch(prefix string) Watch {
return s.kvsWatch.GetSubwatch(prefix)
return s.kvsWatch.NewPrefixWatch(prefix)
}
// EnsureRegistration is used to make sure a node, service, and check

View File

@ -80,9 +80,29 @@ func (d *DumbWatchManager) Notify() {
}
}
// PrefixWatch maintains a notify group for each prefix, allowing for much more
// fine-grained watches.
// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager,
// bound to a specific prefix.
type PrefixWatch struct {
// manager is the underlying watch manager.
manager *PrefixWatchManager
// prefix is the prefix we are watching.
prefix string
}
// Wait registers the given channel with the notify group for our prefix.
func (w *PrefixWatch) Wait(notifyCh chan struct{}) {
w.manager.Wait(w.prefix, notifyCh)
}
// Clear deregisters the given channel from the the notify group for our prefix.
func (w *PrefixWatch) Clear(notifyCh chan struct{}) {
w.manager.Clear(w.prefix, notifyCh)
}
// PrefixWatchManager maintains a notify group for each prefix, allowing for
// much more fine-grained watches.
type PrefixWatchManager struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree
@ -90,37 +110,59 @@ type PrefixWatch struct {
lock sync.Mutex
}
// NewPrefixWatch returns a new prefix watch.
func NewPrefixWatch() *PrefixWatch {
return &PrefixWatch{
// NewPrefixWatchManager returns a new prefix watch manager.
func NewPrefixWatchManager() *PrefixWatchManager {
return &PrefixWatchManager{
watches: radix.New(),
}
}
// GetSubwatch returns the notify group for the given prefix.
func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup {
// NewPrefixWatch returns a Watch-compatible interface for watching the given
// prefix.
func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch {
return &PrefixWatch{
manager: w,
prefix: prefix,
}
}
// Wait registers the given channel on a prefix.
func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()
var group *NotifyGroup
if raw, ok := w.watches.Get(prefix); ok {
group = raw.(*NotifyGroup)
} else {
group = &NotifyGroup{}
w.watches.Insert(prefix, group)
}
group.Wait(notifyCh)
}
// Clear deregisters the given channel from the notify group for a prefix (if
// one exists).
func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()
if raw, ok := w.watches.Get(prefix); ok {
return raw.(*NotifyGroup)
group := raw.(*NotifyGroup)
group.Clear(notifyCh)
}
group := &NotifyGroup{}
w.watches.Insert(prefix, group)
return group
}
// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatch) Notify(prefix string, subtree bool) {
func (w *PrefixWatchManager) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()
var cleanup []string
fn := func(k string, v interface{}) bool {
group := v.(*NotifyGroup)
fn := func(k string, raw interface{}) bool {
group := raw.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)

View File

@ -1,6 +1,8 @@
package state
import (
"sort"
"strings"
"testing"
)
@ -163,14 +165,106 @@ func TestWatch_DumbWatchManager(t *testing.T) {
}()
}
func verifyWatches(t *testing.T, w *PrefixWatchManager, expected string) {
var found []string
fn := func(k string, v interface{}) bool {
if k == "" {
k = "(full)"
}
found = append(found, k)
return false
}
w.watches.WalkPrefix("", fn)
sort.Strings(found)
actual := strings.Join(found, "|")
if expected != actual {
t.Fatalf("bad: %s != %s", expected, actual)
}
}
func TestWatch_PrefixWatchManager(t *testing.T) {
w := NewPrefixWatchManager()
verifyWatches(t, w, "")
// This will create the watch group.
ch1 := make(chan struct{}, 1)
w.Wait("hello", ch1)
verifyWatches(t, w, "hello")
// This will add to the existing one.
ch2 := make(chan struct{}, 1)
w.Wait("hello", ch2)
verifyWatches(t, w, "hello")
// This will add to the existing as well.
ch3 := make(chan struct{}, 1)
w.Wait("hello", ch3)
verifyWatches(t, w, "hello")
// Remove one of the watches.
w.Clear("hello", ch2)
verifyWatches(t, w, "hello")
// Do "clear" for one that was never added.
ch4 := make(chan struct{}, 1)
w.Clear("hello", ch4)
verifyWatches(t, w, "hello")
// Add a full table watch.
full := make(chan struct{}, 1)
w.Wait("", full)
verifyWatches(t, w, "(full)|hello")
// Add another channel for a different prefix.
nope := make(chan struct{}, 1)
w.Wait("nope", nope)
verifyWatches(t, w, "(full)|hello|nope")
// Fire off the notification and make sure channels were pinged (or not)
// as expected.
w.Notify("hello", false)
verifyWatches(t, w, "(full)|nope")
select {
case <-ch1:
default:
t.Fatalf("ch1 should have been notified")
}
select {
case <-ch2:
t.Fatalf("ch2 should not have been notified")
default:
}
select {
case <-ch3:
default:
t.Fatalf("ch3 should have been notified")
}
select {
case <-ch4:
t.Fatalf("ch4 should not have been notified")
default:
}
select {
case <-nope:
t.Fatalf("nope should not have been notified")
default:
}
select {
case <-full:
default:
t.Fatalf("full should have been notified")
}
}
func TestWatch_PrefixWatch(t *testing.T) {
w := NewPrefixWatch()
w := NewPrefixWatchManager()
// Hit a specific key.
verifyWatch(t, w.GetSubwatch(""), func() {
verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
verifyWatch(t, w.NewPrefixWatch(""), func() {
verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
w.Notify("foo/bar/baz", false)
})
})
@ -179,35 +273,39 @@ func TestWatch_PrefixWatch(t *testing.T) {
// Make sure cleanup is happening. All that should be left is the
// full-table watch and the un-fired watches.
fn := func(k string, v interface{}) bool {
if k != "" && k != "foo/bar/zoo" && k != "nope" {
t.Fatalf("unexpected watch: %s", k)
}
return false
}
w.watches.WalkPrefix("", fn)
verifyWatches(t, w, "(full)|foo/bar/zoo|nope")
// Delete a subtree.
verifyWatch(t, w.GetSubwatch(""), func() {
verifyWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
verifyWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
verifyWatch(t, w.NewPrefixWatch(""), func() {
verifyWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
verifyWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
w.Notify("foo/", true)
})
})
})
})
verifyWatches(t, w, "(full)|nope")
// Hit an unknown key.
verifyWatch(t, w.GetSubwatch(""), func() {
verifyNoWatch(t, w.GetSubwatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.GetSubwatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.GetSubwatch("nope"), func() {
verifyWatch(t, w.NewPrefixWatch(""), func() {
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/baz"), func() {
verifyNoWatch(t, w.NewPrefixWatch("foo/bar/zoo"), func() {
verifyNoWatch(t, w.NewPrefixWatch("nope"), func() {
w.Notify("not/in/there", false)
})
})
})
})
verifyWatches(t, w, "(full)|foo/bar/baz|foo/bar/zoo|nope")
// Make sure a watch can be reused.
watch := w.NewPrefixWatch("over/and/over")
for i := 0; i < 10; i++ {
verifyWatch(t, watch, func() {
w.Notify("over/and/over", false)
})
}
}
type MockWatch struct {