Cuts KVS endpoints over to new fine-grained watch plumbing.

This commit is contained in:
James Phillips 2017-01-24 11:20:51 -08:00
parent 68e90d0f24
commit 7da2f513dc
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
10 changed files with 161 additions and 98 deletions

View File

@ -387,7 +387,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList("/remove")
idx, _, err := fsm.state.KVSList(nil, "/remove")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -491,7 +491,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify key is set
_, d, err := fsm2.state.KVSGet("/test")
_, d, err := fsm2.state.KVSGet(nil, "/test")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -617,7 +617,7 @@ func TestFSM_KVSSet(t *testing.T) {
}
// Verify key is set
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -662,7 +662,7 @@ func TestFSM_KVSDelete(t *testing.T) {
}
// Verify key is not set
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -708,7 +708,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
}
// Verify key is not set
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -742,7 +742,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
}
// Verify key is set
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -763,7 +763,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
}
// Verify key is gone
_, d, err = fsm.state.KVSGet("/test/path")
_, d, err = fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -797,7 +797,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
}
// Verify key is set
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -819,7 +819,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
}
// Verify key is updated
_, d, err = fsm.state.KVSGet("/test/path")
_, d, err = fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -976,7 +976,7 @@ func TestFSM_KVSLock(t *testing.T) {
}
// Verify key is locked
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1038,7 +1038,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
}
// Verify key is unlocked
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1234,7 +1234,7 @@ func TestFSM_TombstoneReap(t *testing.T) {
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList("/remove")
idx, _, err := fsm.state.KVSList(nil, "/remove")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1301,7 +1301,7 @@ func TestFSM_Txn(t *testing.T) {
}
// Verify key is set directly in the state store.
_, d, err := fsm.state.KVSGet("/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// KVS endpoint is used to manipulate the Key-Value store
@ -119,12 +120,11 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetKVSWatch(args.Key),
func() error {
index, ent, err := state.KVSGet(args.Key)
func(ws memdb.WatchSet) error {
index, ent, err := state.KVSGet(ws, args.Key)
if err != nil {
return err
}
@ -161,12 +161,11 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetKVSWatch(args.Key),
func() error {
index, ent, err := state.KVSList(args.Key)
func(ws memdb.WatchSet) error {
index, ent, err := state.KVSList(ws, args.Key)
if err != nil {
return err
}
@ -204,12 +203,11 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetKVSWatch(args.Prefix),
func() error {
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
func(ws memdb.WatchSet) error {
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
if err != nil {
return err
}

View File

@ -36,7 +36,7 @@ func TestKVS_Apply(t *testing.T) {
// Verify
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
_, d, err := state.KVSGet(nil, "test")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -58,7 +58,7 @@ func TestKVS_Apply(t *testing.T) {
}
// Verify
_, d, err = state.KVSGet("test")
_, d, err = state.KVSGet(nil, "test")
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -119,24 +119,25 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr
}
// KVSGet is used to retrieve a key/value pair from the state store.
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsGetTxn(tx, key)
return s.kvsGetTxn(tx, ws, key)
}
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction.
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirEntry, error) {
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
// Get the table index.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Retrieve the key.
entry, err := tx.First("kvs", "id", key)
watchCh, entry, err := tx.FirstWatch("kvs", "id", key)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
ws.Add(watchCh)
if entry != nil {
return idx, entry.(*structs.DirEntry), nil
}
@ -147,16 +148,16 @@ func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirE
// prefix is left empty, all keys in the KVS will be returned. The returned
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsListTxn(tx, prefix)
return s.kvsListTxn(tx, ws, prefix)
}
// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.DirEntries, error) {
func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
@ -165,6 +166,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.D
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
ws.Add(entries.WatchCh())
// Gather all of the keys found in the store
var ents structs.DirEntries
@ -203,7 +205,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.D
// An optional separator may be specified, which can be used to slice off a part
// of the response so that only a subset of the prefix is returned. In this
// mode, the keys which are omitted are still counted in the returned index.
func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -215,6 +217,7 @@ func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
ws.Add(entries.WatchCh())
prefixLen := len(prefix)
sepLen := len(sep)

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
func TestStateStore_GC(t *testing.T) {
@ -121,7 +122,7 @@ func TestStateStore_ReapTombstones(t *testing.T) {
// Pull out the list and check the index, which should come from the
// tombstones.
idx, _, err := s.KVSList("foo/")
idx, _, err := s.KVSList(nil, "foo/")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -135,7 +136,7 @@ func TestStateStore_ReapTombstones(t *testing.T) {
}
// Should still be good because 7 is in there.
idx, _, err = s.KVSList("foo/")
idx, _, err = s.KVSList(nil, "foo/")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -149,7 +150,7 @@ func TestStateStore_ReapTombstones(t *testing.T) {
}
// At this point the sub index will slide backwards.
idx, _, err = s.KVSList("foo/")
idx, _, err = s.KVSList(nil, "foo/")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -173,7 +174,8 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
s := testStateStore(t)
// Get on an nonexistent key returns nil.
idx, result, err := s.KVSGet("foo")
ws := memdb.NewWatchSet()
idx, result, err := s.KVSGet(ws, "foo")
if result != nil || err != nil || idx != 0 {
t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
}
@ -186,9 +188,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
if err := s.KVSSet(1, entry); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Retrieve the K/V entry again.
idx, result, err = s.KVSGet("foo")
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -217,9 +223,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
if err := s.KVSSet(2, update); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Fetch the kv pair and check.
idx, result, err = s.KVSGet("foo")
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -242,9 +252,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
if err := s.KVSSet(3, update); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Fetch the kv pair and check.
idx, result, err = s.KVSGet("foo")
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -276,9 +290,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
if !ok || err != nil {
t.Fatalf("didn't get the lock: %v %s", ok, err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Fetch the kv pair and check.
idx, result, err = s.KVSGet("foo")
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -304,9 +322,13 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
if err := s.KVSSet(7, update); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Fetch the kv pair and check.
idx, result, err = s.KVSGet("foo")
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -323,11 +345,17 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Setting some unrelated key should not fire the watch.
testSetKey(t, s, 8, "other", "yup")
if watchFired(ws) {
t.Fatalf("bad")
}
// Fetch a key that doesn't exist and make sure we get the right
// response.
idx, result, err = s.KVSGet("nope")
if result != nil || err != nil || idx != 7 {
t.Fatalf("expected (7, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
idx, result, err = s.KVSGet(nil, "nope")
if result != nil || err != nil || idx != 8 {
t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
}
}
@ -335,7 +363,8 @@ func TestStateStore_KVSList(t *testing.T) {
s := testStateStore(t)
// Listing an empty KVS returns nothing
idx, entries, err := s.KVSList("")
ws := memdb.NewWatchSet()
idx, entries, err := s.KVSList(ws, "")
if idx != 0 || entries != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err)
}
@ -346,9 +375,12 @@ func TestStateStore_KVSList(t *testing.T) {
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
testSetKey(t, s, 5, "foo/bar/baz", "baz")
if !watchFired(ws) {
t.Fatalf("bad")
}
// List out all of the keys
idx, entries, err = s.KVSList("")
idx, entries, err = s.KVSList(nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -362,7 +394,7 @@ func TestStateStore_KVSList(t *testing.T) {
}
// Try listing with a provided prefix
idx, entries, err = s.KVSList("foo/bar/zip")
idx, entries, err = s.KVSList(nil, "foo/bar/zip")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -379,10 +411,19 @@ func TestStateStore_KVSList(t *testing.T) {
}
// Delete a key and make sure the index comes from the tombstone.
ws = memdb.NewWatchSet()
idx, _, err = s.KVSList(ws, "foo/bar/baz")
if err != nil {
t.Fatalf("err: %s", err)
}
if err := s.KVSDelete(6, "foo/bar/baz"); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList("foo/bar/baz")
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, _, err = s.KVSList(ws, "foo/bar/baz")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -390,11 +431,15 @@ func TestStateStore_KVSList(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Set a different key to bump the index.
// Set a different key to bump the index. This shouldn't fire the
// watch since there's a different prefix.
testSetKey(t, s, 7, "some/other/key", "")
if watchFired(ws) {
t.Fatalf("bad")
}
// Make sure we get the right index from the tombstone.
idx, _, err = s.KVSList("foo/bar/baz")
idx, _, err = s.KVSList(nil, "foo/bar/baz")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -407,7 +452,7 @@ func TestStateStore_KVSList(t *testing.T) {
if err := s.ReapTombstones(6); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList("foo/bar/baz")
idx, _, err = s.KVSList(nil, "foo/bar/baz")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -416,7 +461,7 @@ func TestStateStore_KVSList(t *testing.T) {
}
// List all the keys to make sure the index is also correct.
idx, _, err = s.KVSList("")
idx, _, err = s.KVSList(nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -429,7 +474,8 @@ func TestStateStore_KVSListKeys(t *testing.T) {
s := testStateStore(t)
// Listing keys with no results returns nil.
idx, keys, err := s.KVSListKeys("", "")
ws := memdb.NewWatchSet()
idx, keys, err := s.KVSListKeys(ws, "", "")
if idx != 0 || keys != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err)
}
@ -442,9 +488,12 @@ func TestStateStore_KVSListKeys(t *testing.T) {
testSetKey(t, s, 5, "foo/bar/zip/zam", "zam")
testSetKey(t, s, 6, "foo/bar/zip/zorp", "zorp")
testSetKey(t, s, 7, "some/other/prefix", "nack")
if !watchFired(ws) {
t.Fatalf("bad")
}
// List all the keys.
idx, keys, err = s.KVSListKeys("", "")
idx, keys, err = s.KVSListKeys(nil, "", "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -456,7 +505,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
}
// Query using a prefix and pass a separator.
idx, keys, err = s.KVSListKeys("foo/bar/", "/")
idx, keys, err = s.KVSListKeys(nil, "foo/bar/", "/")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -474,7 +523,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
}
// Listing keys with no separator returns everything.
idx, keys, err = s.KVSListKeys("foo", "")
idx, keys, err = s.KVSListKeys(nil, "foo", "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -488,10 +537,19 @@ func TestStateStore_KVSListKeys(t *testing.T) {
}
// Delete a key and make sure the index comes from the tombstone.
ws = memdb.NewWatchSet()
idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if err := s.KVSDelete(8, "foo/bar/baz"); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSListKeys("foo/bar/baz", "")
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -499,11 +557,15 @@ func TestStateStore_KVSListKeys(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Set a different key to bump the index.
// Set a different key to bump the index. This shouldn't fire the watch
// since there's a different prefix.
testSetKey(t, s, 9, "some/other/key", "")
if watchFired(ws) {
t.Fatalf("bad")
}
// Make sure the index still comes from the tombstone.
idx, _, err = s.KVSListKeys("foo/bar/baz", "")
idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -516,7 +578,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
if err := s.ReapTombstones(8); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSListKeys("foo/bar/baz", "")
idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -525,7 +587,7 @@ func TestStateStore_KVSListKeys(t *testing.T) {
}
// List all the keys to make sure the index is also correct.
idx, _, err = s.KVSListKeys("", "")
idx, _, err = s.KVSListKeys(nil, "", "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -573,7 +635,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
// Check that the tombstone was created and that prevents the index
// from sliding backwards.
idx, _, err := s.KVSList("foo")
idx, _, err := s.KVSList(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -586,7 +648,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
if err := s.ReapTombstones(3); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList("foo")
idx, _, err = s.KVSList(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -620,7 +682,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
// Check that the index is untouched and the entry
// has not been deleted.
idx, e, err := s.KVSGet("foo")
idx, e, err := s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -639,7 +701,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
}
// Entry was deleted and index was updated
idx, e, err = s.KVSGet("bar")
idx, e, err = s.KVSGet(nil, "bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -655,7 +717,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
// Check that the tombstone was created and that prevents the index
// from sliding backwards.
idx, _, err = s.KVSList("bar")
idx, _, err = s.KVSList(nil, "bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -668,7 +730,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
if err := s.ReapTombstones(4); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList("bar")
idx, _, err = s.KVSList(nil, "bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -733,7 +795,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was inserted
idx, entry, err := s.KVSGet("foo")
idx, entry, err := s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -775,7 +837,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was not updated in the store
idx, entry, err = s.KVSGet("foo")
idx, entry, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -802,7 +864,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was updated
idx, entry, err = s.KVSGet("foo")
idx, entry, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -829,7 +891,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was updated, but the session should have been ignored.
idx, entry, err = s.KVSGet("foo")
idx, entry, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -874,7 +936,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was updated, and the lock status should have stayed the same.
idx, entry, err = s.KVSGet("foo")
idx, entry, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -938,7 +1000,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
// Check that the tombstones ware created and that prevents the index
// from sliding backwards.
idx, _, err := s.KVSList("foo")
idx, _, err := s.KVSList(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -951,7 +1013,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
if err := s.ReapTombstones(5); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList("foo")
idx, _, err = s.KVSList(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1000,7 +1062,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err := s.KVSGet("foo")
idx, result, err := s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1021,7 +1083,7 @@ func TestStateStore_KVSLock(t *testing.T) {
// Make sure the indexes got set properly, note that the lock index
// won't go up since we didn't lock it again.
idx, result, err = s.KVSGet("foo")
idx, result, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1044,7 +1106,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err = s.KVSGet("foo")
idx, result, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1064,7 +1126,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err = s.KVSGet("bar")
idx, result, err = s.KVSGet(nil, "bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1090,7 +1152,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err = s.KVSGet("bar")
idx, result, err = s.KVSGet(nil, "bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1134,7 +1196,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err := s.KVSGet("foo")
idx, result, err := s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1163,7 +1225,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err = s.KVSGet("foo")
idx, result, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1182,7 +1244,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err = s.KVSGet("foo")
idx, result, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1201,7 +1263,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err = s.KVSGet("foo")
idx, result, err = s.KVSGet(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1294,7 +1356,7 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
restore.Commit()
// Read the restored keys back out and verify they match.
idx, res, err := s.KVSList("")
idx, res, err := s.KVSList(nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1467,7 +1529,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
if err := s.ReapTombstones(4); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err := s.KVSList("foo/bar")
idx, _, err := s.KVSList(nil, "foo/bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1504,7 +1566,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
restore.Commit()
// See if the stone works properly in a list query.
idx, _, err := s.KVSList("foo/bar")
idx, _, err := s.KVSList(nil, "foo/bar")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1518,7 +1580,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
if err := s.ReapTombstones(4); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList("foo/bar")
idx, _, err = s.KVSList(nil, "foo/bar")
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -807,7 +807,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
}
// Key should be unlocked.
idx, d2, err := s.KVSGet("/foo")
idx, d2, err := s.KVSGet(nil, "/foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -889,7 +889,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
}
// Key should be deleted.
idx, d2, err := s.KVSGet("/bar")
idx, d2, err := s.KVSGet(nil, "/bar")
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -152,7 +152,7 @@ func TestStateStore_Restore_Abort(t *testing.T) {
}
restore.Abort()
idx, entries, err := s.KVSList("")
idx, entries, err := s.KVSList(nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -55,14 +55,14 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
}
case structs.KVSGet:
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
_, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key)
if entry == nil && err == nil {
err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key)
}
case structs.KVSGetTree:
var entries structs.DirEntries
_, entries, err = s.kvsListTxn(tx, op.DirEnt.Key)
_, entries, err = s.kvsListTxn(tx, nil, op.DirEnt.Key)
if err == nil {
results := make(structs.TxnResults, 0, len(entries))
for _, e := range entries {

View File

@ -295,7 +295,7 @@ func TestStateStore_Txn_KVS(t *testing.T) {
}
// Pull the resulting state store contents.
idx, actual, err := s.KVSList("")
idx, actual, err := s.KVSList(nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -364,7 +364,7 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
// This function verifies that the state store wasn't changed.
verifyStateStore := func(desc string) {
idx, actual, err := s.KVSList("")
idx, actual, err := s.KVSList(nil, "")
if err != nil {
t.Fatalf("err (%s): %s", desc, err)
}

View File

@ -55,7 +55,7 @@ func TestTxn_Apply(t *testing.T) {
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
_, d, err := state.KVSGet(nil, "test")
if err != nil {
t.Fatalf("err: %v", err)
}