From 7da2f513dc97dcbfbc968f10e7fd475f855f3488 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 24 Jan 2017 11:20:51 -0800 Subject: [PATCH] Cuts KVS endpoints over to new fine-grained watch plumbing. --- consul/fsm_test.go | 26 ++--- consul/kvs_endpoint.go | 22 ++-- consul/kvs_endpoint_test.go | 4 +- consul/state/kvs.go | 19 ++-- consul/state/kvs_test.go | 172 +++++++++++++++++++++---------- consul/state/session_test.go | 4 +- consul/state/state_store_test.go | 2 +- consul/state/txn.go | 4 +- consul/state/txn_test.go | 4 +- consul/txn_endpoint_test.go | 2 +- 10 files changed, 161 insertions(+), 98 deletions(-) diff --git a/consul/fsm_test.go b/consul/fsm_test.go index d06a303dc0..46608aa075 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -387,7 +387,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { Value: []byte("foo"), }) fsm.state.KVSDelete(12, "/remove") - idx, _, err := fsm.state.KVSList("/remove") + idx, _, err := fsm.state.KVSList(nil, "/remove") if err != nil { t.Fatalf("err: %s", err) } @@ -491,7 +491,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Verify key is set - _, d, err := fsm2.state.KVSGet("/test") + _, d, err := fsm2.state.KVSGet(nil, "/test") if err != nil { t.Fatalf("err: %v", err) } @@ -617,7 +617,7 @@ func TestFSM_KVSSet(t *testing.T) { } // Verify key is set - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -662,7 +662,7 @@ func TestFSM_KVSDelete(t *testing.T) { } // Verify key is not set - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -708,7 +708,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) { } // Verify key is not set - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -742,7 +742,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { } // Verify key is set - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -763,7 +763,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { } // Verify key is gone - _, d, err = fsm.state.KVSGet("/test/path") + _, d, err = fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -797,7 +797,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { } // Verify key is set - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -819,7 +819,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { } // Verify key is updated - _, d, err = fsm.state.KVSGet("/test/path") + _, d, err = fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -976,7 +976,7 @@ func TestFSM_KVSLock(t *testing.T) { } // Verify key is locked - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -1038,7 +1038,7 @@ func TestFSM_KVSUnlock(t *testing.T) { } // Verify key is unlocked - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } @@ -1234,7 +1234,7 @@ func TestFSM_TombstoneReap(t *testing.T) { Value: []byte("foo"), }) fsm.state.KVSDelete(12, "/remove") - idx, _, err := fsm.state.KVSList("/remove") + idx, _, err := fsm.state.KVSList(nil, "/remove") if err != nil { t.Fatalf("err: %s", err) } @@ -1301,7 +1301,7 @@ func TestFSM_Txn(t *testing.T) { } // Verify key is set directly in the state store. - _, d, err := fsm.state.KVSGet("/test/path") + _, d, err := fsm.state.KVSGet(nil, "/test/path") if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 95ce7576ea..114a764d2d 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" ) // KVS endpoint is used to manipulate the Key-Value store @@ -119,12 +120,11 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC( + return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetKVSWatch(args.Key), - func() error { - index, ent, err := state.KVSGet(args.Key) + func(ws memdb.WatchSet) error { + index, ent, err := state.KVSGet(ws, args.Key) if err != nil { return err } @@ -161,12 +161,11 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC( + return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetKVSWatch(args.Key), - func() error { - index, ent, err := state.KVSList(args.Key) + func(ws memdb.WatchSet) error { + index, ent, err := state.KVSList(ws, args.Key) if err != nil { return err } @@ -204,12 +203,11 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi // Get the local state state := k.srv.fsm.State() - return k.srv.blockingRPC( + return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetKVSWatch(args.Prefix), - func() error { - index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator) + func(ws memdb.WatchSet) error { + index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator) if err != nil { return err } diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 50bd58b257..86cd36476c 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -36,7 +36,7 @@ func TestKVS_Apply(t *testing.T) { // Verify state := s1.fsm.State() - _, d, err := state.KVSGet("test") + _, d, err := state.KVSGet(nil, "test") if err != nil { t.Fatalf("err: %v", err) } @@ -58,7 +58,7 @@ func TestKVS_Apply(t *testing.T) { } // Verify - _, d, err = state.KVSGet("test") + _, d, err = state.KVSGet(nil, "test") if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/state/kvs.go b/consul/state/kvs.go index 3dccdebd31..d8f2169731 100644 --- a/consul/state/kvs.go +++ b/consul/state/kvs.go @@ -119,24 +119,25 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr } // KVSGet is used to retrieve a key/value pair from the state store. -func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { +func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) { tx := s.db.Txn(false) defer tx.Abort() - return s.kvsGetTxn(tx, key) + return s.kvsGetTxn(tx, ws, key) } // kvsGetTxn is the inner method that gets a KVS entry inside an existing // transaction. -func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirEntry, error) { +func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) { // Get the table index. idx := maxIndexTxn(tx, "kvs", "tombstones") // Retrieve the key. - entry, err := tx.First("kvs", "id", key) + watchCh, entry, err := tx.FirstWatch("kvs", "id", key) if err != nil { return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) } + ws.Add(watchCh) if entry != nil { return idx, entry.(*structs.DirEntry), nil } @@ -147,16 +148,16 @@ func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirE // prefix is left empty, all keys in the KVS will be returned. The returned // is the max index of the returned kvs entries or applicable tombstones, or // else it's the full table indexes for kvs and tombstones. -func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { +func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) { tx := s.db.Txn(false) defer tx.Abort() - return s.kvsListTxn(tx, prefix) + return s.kvsListTxn(tx, ws, prefix) } // kvsListTxn is the inner method that gets a list of KVS entries matching a // prefix. -func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.DirEntries, error) { +func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) { // Get the table indexes. idx := maxIndexTxn(tx, "kvs", "tombstones") @@ -165,6 +166,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.D if err != nil { return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) } + ws.Add(entries.WatchCh()) // Gather all of the keys found in the store var ents structs.DirEntries @@ -203,7 +205,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.D // An optional separator may be specified, which can be used to slice off a part // of the response so that only a subset of the prefix is returned. In this // mode, the keys which are omitted are still counted in the returned index. -func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) { +func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -215,6 +217,7 @@ func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) { if err != nil { return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) } + ws.Add(entries.WatchCh()) prefixLen := len(prefix) sepLen := len(sep) diff --git a/consul/state/kvs_test.go b/consul/state/kvs_test.go index bd8996a014..f4e6dd56b7 100644 --- a/consul/state/kvs_test.go +++ b/consul/state/kvs_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" ) func TestStateStore_GC(t *testing.T) { @@ -121,7 +122,7 @@ func TestStateStore_ReapTombstones(t *testing.T) { // Pull out the list and check the index, which should come from the // tombstones. - idx, _, err := s.KVSList("foo/") + idx, _, err := s.KVSList(nil, "foo/") if err != nil { t.Fatalf("err: %s", err) } @@ -135,7 +136,7 @@ func TestStateStore_ReapTombstones(t *testing.T) { } // Should still be good because 7 is in there. - idx, _, err = s.KVSList("foo/") + idx, _, err = s.KVSList(nil, "foo/") if err != nil { t.Fatalf("err: %s", err) } @@ -149,7 +150,7 @@ func TestStateStore_ReapTombstones(t *testing.T) { } // At this point the sub index will slide backwards. - idx, _, err = s.KVSList("foo/") + idx, _, err = s.KVSList(nil, "foo/") if err != nil { t.Fatalf("err: %s", err) } @@ -173,7 +174,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { s := testStateStore(t) // Get on an nonexistent key returns nil. - idx, result, err := s.KVSGet("foo") + ws := memdb.NewWatchSet() + idx, result, err := s.KVSGet(ws, "foo") if result != nil || err != nil || idx != 0 { t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) } @@ -186,9 +188,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { if err := s.KVSSet(1, entry); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Retrieve the K/V entry again. - idx, result, err = s.KVSGet("foo") + ws = memdb.NewWatchSet() + idx, result, err = s.KVSGet(ws, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -217,9 +223,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { if err := s.KVSSet(2, update); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") + ws = memdb.NewWatchSet() + idx, result, err = s.KVSGet(ws, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -242,9 +252,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { if err := s.KVSSet(3, update); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") + ws = memdb.NewWatchSet() + idx, result, err = s.KVSGet(ws, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -276,9 +290,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { if !ok || err != nil { t.Fatalf("didn't get the lock: %v %s", ok, err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") + ws = memdb.NewWatchSet() + idx, result, err = s.KVSGet(ws, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -304,9 +322,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { if err := s.KVSSet(7, update); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") + ws = memdb.NewWatchSet() + idx, result, err = s.KVSGet(ws, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -323,11 +345,17 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { t.Fatalf("bad index: %d", idx) } + // Setting some unrelated key should not fire the watch. + testSetKey(t, s, 8, "other", "yup") + if watchFired(ws) { + t.Fatalf("bad") + } + // Fetch a key that doesn't exist and make sure we get the right // response. - idx, result, err = s.KVSGet("nope") - if result != nil || err != nil || idx != 7 { - t.Fatalf("expected (7, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) + idx, result, err = s.KVSGet(nil, "nope") + if result != nil || err != nil || idx != 8 { + t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) } } @@ -335,7 +363,8 @@ func TestStateStore_KVSList(t *testing.T) { s := testStateStore(t) // Listing an empty KVS returns nothing - idx, entries, err := s.KVSList("") + ws := memdb.NewWatchSet() + idx, entries, err := s.KVSList(ws, "") if idx != 0 || entries != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err) } @@ -346,9 +375,12 @@ func TestStateStore_KVSList(t *testing.T) { testSetKey(t, s, 3, "foo/bar/zip", "zip") testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp") testSetKey(t, s, 5, "foo/bar/baz", "baz") + if !watchFired(ws) { + t.Fatalf("bad") + } // List out all of the keys - idx, entries, err = s.KVSList("") + idx, entries, err = s.KVSList(nil, "") if err != nil { t.Fatalf("err: %s", err) } @@ -362,7 +394,7 @@ func TestStateStore_KVSList(t *testing.T) { } // Try listing with a provided prefix - idx, entries, err = s.KVSList("foo/bar/zip") + idx, entries, err = s.KVSList(nil, "foo/bar/zip") if err != nil { t.Fatalf("err: %s", err) } @@ -379,10 +411,19 @@ func TestStateStore_KVSList(t *testing.T) { } // Delete a key and make sure the index comes from the tombstone. + ws = memdb.NewWatchSet() + idx, _, err = s.KVSList(ws, "foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } if err := s.KVSDelete(6, "foo/bar/baz"); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList("foo/bar/baz") + if !watchFired(ws) { + t.Fatalf("bad") + } + ws = memdb.NewWatchSet() + idx, _, err = s.KVSList(ws, "foo/bar/baz") if err != nil { t.Fatalf("err: %s", err) } @@ -390,11 +431,15 @@ func TestStateStore_KVSList(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Set a different key to bump the index. + // Set a different key to bump the index. This shouldn't fire the + // watch since there's a different prefix. testSetKey(t, s, 7, "some/other/key", "") + if watchFired(ws) { + t.Fatalf("bad") + } // Make sure we get the right index from the tombstone. - idx, _, err = s.KVSList("foo/bar/baz") + idx, _, err = s.KVSList(nil, "foo/bar/baz") if err != nil { t.Fatalf("err: %s", err) } @@ -407,7 +452,7 @@ func TestStateStore_KVSList(t *testing.T) { if err := s.ReapTombstones(6); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList("foo/bar/baz") + idx, _, err = s.KVSList(nil, "foo/bar/baz") if err != nil { t.Fatalf("err: %s", err) } @@ -416,7 +461,7 @@ func TestStateStore_KVSList(t *testing.T) { } // List all the keys to make sure the index is also correct. - idx, _, err = s.KVSList("") + idx, _, err = s.KVSList(nil, "") if err != nil { t.Fatalf("err: %s", err) } @@ -429,7 +474,8 @@ func TestStateStore_KVSListKeys(t *testing.T) { s := testStateStore(t) // Listing keys with no results returns nil. - idx, keys, err := s.KVSListKeys("", "") + ws := memdb.NewWatchSet() + idx, keys, err := s.KVSListKeys(ws, "", "") if idx != 0 || keys != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err) } @@ -442,9 +488,12 @@ func TestStateStore_KVSListKeys(t *testing.T) { testSetKey(t, s, 5, "foo/bar/zip/zam", "zam") testSetKey(t, s, 6, "foo/bar/zip/zorp", "zorp") testSetKey(t, s, 7, "some/other/prefix", "nack") + if !watchFired(ws) { + t.Fatalf("bad") + } // List all the keys. - idx, keys, err = s.KVSListKeys("", "") + idx, keys, err = s.KVSListKeys(nil, "", "") if err != nil { t.Fatalf("err: %s", err) } @@ -456,7 +505,7 @@ func TestStateStore_KVSListKeys(t *testing.T) { } // Query using a prefix and pass a separator. - idx, keys, err = s.KVSListKeys("foo/bar/", "/") + idx, keys, err = s.KVSListKeys(nil, "foo/bar/", "/") if err != nil { t.Fatalf("err: %s", err) } @@ -474,7 +523,7 @@ func TestStateStore_KVSListKeys(t *testing.T) { } // Listing keys with no separator returns everything. - idx, keys, err = s.KVSListKeys("foo", "") + idx, keys, err = s.KVSListKeys(nil, "foo", "") if err != nil { t.Fatalf("err: %s", err) } @@ -488,10 +537,19 @@ func TestStateStore_KVSListKeys(t *testing.T) { } // Delete a key and make sure the index comes from the tombstone. + ws = memdb.NewWatchSet() + idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "") + if err != nil { + t.Fatalf("err: %s", err) + } if err := s.KVSDelete(8, "foo/bar/baz"); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSListKeys("foo/bar/baz", "") + if !watchFired(ws) { + t.Fatalf("bad") + } + ws = memdb.NewWatchSet() + idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "") if err != nil { t.Fatalf("err: %s", err) } @@ -499,11 +557,15 @@ func TestStateStore_KVSListKeys(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Set a different key to bump the index. + // Set a different key to bump the index. This shouldn't fire the watch + // since there's a different prefix. testSetKey(t, s, 9, "some/other/key", "") + if watchFired(ws) { + t.Fatalf("bad") + } // Make sure the index still comes from the tombstone. - idx, _, err = s.KVSListKeys("foo/bar/baz", "") + idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "") if err != nil { t.Fatalf("err: %s", err) } @@ -516,7 +578,7 @@ func TestStateStore_KVSListKeys(t *testing.T) { if err := s.ReapTombstones(8); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSListKeys("foo/bar/baz", "") + idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "") if err != nil { t.Fatalf("err: %s", err) } @@ -525,7 +587,7 @@ func TestStateStore_KVSListKeys(t *testing.T) { } // List all the keys to make sure the index is also correct. - idx, _, err = s.KVSListKeys("", "") + idx, _, err = s.KVSListKeys(nil, "", "") if err != nil { t.Fatalf("err: %s", err) } @@ -573,7 +635,7 @@ func TestStateStore_KVSDelete(t *testing.T) { // Check that the tombstone was created and that prevents the index // from sliding backwards. - idx, _, err := s.KVSList("foo") + idx, _, err := s.KVSList(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -586,7 +648,7 @@ func TestStateStore_KVSDelete(t *testing.T) { if err := s.ReapTombstones(3); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList("foo") + idx, _, err = s.KVSList(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -620,7 +682,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) { // Check that the index is untouched and the entry // has not been deleted. - idx, e, err := s.KVSGet("foo") + idx, e, err := s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -639,7 +701,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) { } // Entry was deleted and index was updated - idx, e, err = s.KVSGet("bar") + idx, e, err = s.KVSGet(nil, "bar") if err != nil { t.Fatalf("err: %s", err) } @@ -655,7 +717,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) { // Check that the tombstone was created and that prevents the index // from sliding backwards. - idx, _, err = s.KVSList("bar") + idx, _, err = s.KVSList(nil, "bar") if err != nil { t.Fatalf("err: %s", err) } @@ -668,7 +730,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) { if err := s.ReapTombstones(4); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList("bar") + idx, _, err = s.KVSList(nil, "bar") if err != nil { t.Fatalf("err: %s", err) } @@ -733,7 +795,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { } // Entry was inserted - idx, entry, err := s.KVSGet("foo") + idx, entry, err := s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -775,7 +837,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { } // Entry was not updated in the store - idx, entry, err = s.KVSGet("foo") + idx, entry, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -802,7 +864,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { } // Entry was updated - idx, entry, err = s.KVSGet("foo") + idx, entry, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -829,7 +891,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { } // Entry was updated, but the session should have been ignored. - idx, entry, err = s.KVSGet("foo") + idx, entry, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -874,7 +936,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { } // Entry was updated, and the lock status should have stayed the same. - idx, entry, err = s.KVSGet("foo") + idx, entry, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -938,7 +1000,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) { // Check that the tombstones ware created and that prevents the index // from sliding backwards. - idx, _, err := s.KVSList("foo") + idx, _, err := s.KVSList(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -951,7 +1013,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) { if err := s.ReapTombstones(5); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList("foo") + idx, _, err = s.KVSList(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1000,7 +1062,7 @@ func TestStateStore_KVSLock(t *testing.T) { } // Make sure the indexes got set properly. - idx, result, err := s.KVSGet("foo") + idx, result, err := s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1021,7 +1083,7 @@ func TestStateStore_KVSLock(t *testing.T) { // Make sure the indexes got set properly, note that the lock index // won't go up since we didn't lock it again. - idx, result, err = s.KVSGet("foo") + idx, result, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1044,7 +1106,7 @@ func TestStateStore_KVSLock(t *testing.T) { } // Make sure the indexes got set properly. - idx, result, err = s.KVSGet("foo") + idx, result, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1064,7 +1126,7 @@ func TestStateStore_KVSLock(t *testing.T) { } // Make sure the indexes got set properly. - idx, result, err = s.KVSGet("bar") + idx, result, err = s.KVSGet(nil, "bar") if err != nil { t.Fatalf("err: %s", err) } @@ -1090,7 +1152,7 @@ func TestStateStore_KVSLock(t *testing.T) { } // Make sure the indexes didn't update. - idx, result, err = s.KVSGet("bar") + idx, result, err = s.KVSGet(nil, "bar") if err != nil { t.Fatalf("err: %s", err) } @@ -1134,7 +1196,7 @@ func TestStateStore_KVSUnlock(t *testing.T) { } // Make sure the indexes didn't update. - idx, result, err := s.KVSGet("foo") + idx, result, err := s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1163,7 +1225,7 @@ func TestStateStore_KVSUnlock(t *testing.T) { } // Make sure the indexes didn't update. - idx, result, err = s.KVSGet("foo") + idx, result, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1182,7 +1244,7 @@ func TestStateStore_KVSUnlock(t *testing.T) { } // Make sure the indexes got set properly. - idx, result, err = s.KVSGet("foo") + idx, result, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1201,7 +1263,7 @@ func TestStateStore_KVSUnlock(t *testing.T) { } // Make sure the indexes didn't update. - idx, result, err = s.KVSGet("foo") + idx, result, err = s.KVSGet(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -1294,7 +1356,7 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { restore.Commit() // Read the restored keys back out and verify they match. - idx, res, err := s.KVSList("") + idx, res, err := s.KVSList(nil, "") if err != nil { t.Fatalf("err: %s", err) } @@ -1467,7 +1529,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { if err := s.ReapTombstones(4); err != nil { t.Fatalf("err: %s", err) } - idx, _, err := s.KVSList("foo/bar") + idx, _, err := s.KVSList(nil, "foo/bar") if err != nil { t.Fatalf("err: %s", err) } @@ -1504,7 +1566,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { restore.Commit() // See if the stone works properly in a list query. - idx, _, err := s.KVSList("foo/bar") + idx, _, err := s.KVSList(nil, "foo/bar") if err != nil { t.Fatalf("err: %s", err) } @@ -1518,7 +1580,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { if err := s.ReapTombstones(4); err != nil { t.Fatalf("err: %s", err) } - idx, _, err = s.KVSList("foo/bar") + idx, _, err = s.KVSList(nil, "foo/bar") if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/state/session_test.go b/consul/state/session_test.go index 6aa6e094c1..7951e14474 100644 --- a/consul/state/session_test.go +++ b/consul/state/session_test.go @@ -807,7 +807,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) { } // Key should be unlocked. - idx, d2, err := s.KVSGet("/foo") + idx, d2, err := s.KVSGet(nil, "/foo") if err != nil { t.Fatalf("err: %s", err) } @@ -889,7 +889,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) { } // Key should be deleted. - idx, d2, err := s.KVSGet("/bar") + idx, d2, err := s.KVSGet(nil, "/bar") if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index bac99f70cb..ec600dd068 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -152,7 +152,7 @@ func TestStateStore_Restore_Abort(t *testing.T) { } restore.Abort() - idx, entries, err := s.KVSList("") + idx, entries, err := s.KVSList(nil, "") if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/state/txn.go b/consul/state/txn.go index 00d7905a2c..d2b3c6f1f5 100644 --- a/consul/state/txn.go +++ b/consul/state/txn.go @@ -55,14 +55,14 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str } case structs.KVSGet: - _, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key) + _, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key) if entry == nil && err == nil { err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key) } case structs.KVSGetTree: var entries structs.DirEntries - _, entries, err = s.kvsListTxn(tx, op.DirEnt.Key) + _, entries, err = s.kvsListTxn(tx, nil, op.DirEnt.Key) if err == nil { results := make(structs.TxnResults, 0, len(entries)) for _, e := range entries { diff --git a/consul/state/txn_test.go b/consul/state/txn_test.go index d868c13523..3423ed1e65 100644 --- a/consul/state/txn_test.go +++ b/consul/state/txn_test.go @@ -295,7 +295,7 @@ func TestStateStore_Txn_KVS(t *testing.T) { } // Pull the resulting state store contents. - idx, actual, err := s.KVSList("") + idx, actual, err := s.KVSList(nil, "") if err != nil { t.Fatalf("err: %s", err) } @@ -364,7 +364,7 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) { // This function verifies that the state store wasn't changed. verifyStateStore := func(desc string) { - idx, actual, err := s.KVSList("") + idx, actual, err := s.KVSList(nil, "") if err != nil { t.Fatalf("err (%s): %s", desc, err) } diff --git a/consul/txn_endpoint_test.go b/consul/txn_endpoint_test.go index b1e60021c8..e502589a01 100644 --- a/consul/txn_endpoint_test.go +++ b/consul/txn_endpoint_test.go @@ -55,7 +55,7 @@ func TestTxn_Apply(t *testing.T) { // Verify the state store directly. state := s1.fsm.State() - _, d, err := state.KVSGet("test") + _, d, err := state.KVSGet(nil, "test") if err != nil { t.Fatalf("err: %v", err) }