mirror of https://github.com/status-im/consul.git
Allows lock holder to re-lock and set a KV, adds tests for corner cases around sessions.
This commit is contained in:
parent
75f9cd5cc3
commit
18d60f95c5
|
@ -1036,7 +1036,7 @@ func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Perform the actual set.
|
// Perform the actual set.
|
||||||
if err := s.kvsSetTxn(tx, idx, entry); err != nil {
|
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1046,23 +1046,35 @@ func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
||||||
|
|
||||||
// kvsSetTxn is used to insert or update a key/value pair in the state
|
// kvsSetTxn is used to insert or update a key/value pair in the state
|
||||||
// store. It is the inner method used and handles only the actual storage.
|
// store. It is the inner method used and handles only the actual storage.
|
||||||
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) error {
|
// If updateSession is true, then the incoming entry will set the new
|
||||||
|
// session (should be validated before calling this). Otherwise, we will keep
|
||||||
|
// whatever the existing session is.
|
||||||
|
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
|
||||||
// Retrieve an existing KV pair
|
// Retrieve an existing KV pair
|
||||||
existing, err := tx.First("kvs", "id", entry.Key)
|
existing, err := tx.First("kvs", "id", entry.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed kvs lookup: %s", err)
|
return fmt.Errorf("failed kvs lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the indexes
|
// Set the indexes.
|
||||||
if existing != nil {
|
if existing != nil {
|
||||||
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
|
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
|
||||||
entry.ModifyIndex = idx
|
|
||||||
} else {
|
} else {
|
||||||
entry.CreateIndex = idx
|
entry.CreateIndex = idx
|
||||||
entry.ModifyIndex = idx
|
}
|
||||||
|
entry.ModifyIndex = idx
|
||||||
|
|
||||||
|
// Preserve the existing session unless told otherwise. The "existing"
|
||||||
|
// session for a new entry is "no session".
|
||||||
|
if !updateSession {
|
||||||
|
if existing != nil {
|
||||||
|
entry.Session = existing.(*structs.DirEntry).Session
|
||||||
|
} else {
|
||||||
|
entry.Session = ""
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the kv pair in the state store and update the index
|
// Store the kv pair in the state store and update the index.
|
||||||
if err := tx.Insert("kvs", entry); err != nil {
|
if err := tx.Insert("kvs", entry); err != nil {
|
||||||
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1287,7 +1299,7 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we made it this far, we should perform the set.
|
// If we made it this far, we should perform the set.
|
||||||
if err := s.kvsSetTxn(tx, idx, entry); err != nil {
|
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1370,14 +1382,19 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error)
|
||||||
|
|
||||||
// Set up the entry, using the existing entry if present.
|
// Set up the entry, using the existing entry if present.
|
||||||
if existing != nil {
|
if existing != nil {
|
||||||
// Bail if there's already a lock on this entry.
|
|
||||||
e := existing.(*structs.DirEntry)
|
e := existing.(*structs.DirEntry)
|
||||||
if e.Session != "" {
|
if e.Session == entry.Session {
|
||||||
|
// We already hold this lock, good to go.
|
||||||
|
entry.CreateIndex = e.CreateIndex
|
||||||
|
entry.LockIndex = e.LockIndex
|
||||||
|
} else if e.Session != "" {
|
||||||
|
// Bail out, someone else holds this lock.
|
||||||
return false, nil
|
return false, nil
|
||||||
|
} else {
|
||||||
|
// Set up a new lock with this session.
|
||||||
|
entry.CreateIndex = e.CreateIndex
|
||||||
|
entry.LockIndex = e.LockIndex + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.CreateIndex = e.CreateIndex
|
|
||||||
entry.LockIndex = e.LockIndex + 1
|
|
||||||
} else {
|
} else {
|
||||||
entry.CreateIndex = idx
|
entry.CreateIndex = idx
|
||||||
entry.LockIndex = 1
|
entry.LockIndex = 1
|
||||||
|
@ -1385,7 +1402,7 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error)
|
||||||
entry.ModifyIndex = idx
|
entry.ModifyIndex = idx
|
||||||
|
|
||||||
// If we made it this far, we should perform the set.
|
// If we made it this far, we should perform the set.
|
||||||
if err := s.kvsSetTxn(tx, idx, entry); err != nil {
|
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1428,7 +1445,7 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error
|
||||||
entry.ModifyIndex = idx
|
entry.ModifyIndex = idx
|
||||||
|
|
||||||
// If we made it this far, we should perform the set.
|
// If we made it this far, we should perform the set.
|
||||||
if err := s.kvsSetTxn(tx, idx, entry); err != nil {
|
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1696,7 +1713,7 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
||||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||||
e := entry.(*structs.DirEntry).Clone()
|
e := entry.(*structs.DirEntry).Clone()
|
||||||
e.Session = ""
|
e.Session = ""
|
||||||
if err := s.kvsSetTxn(tx, idx, e); err != nil {
|
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
|
||||||
return fmt.Errorf("failed kvs update: %s", err)
|
return fmt.Errorf("failed kvs update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1467,13 +1467,13 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||||
func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
// Get on an nonexistent key returns nil
|
// Get on an nonexistent key returns nil.
|
||||||
result, err := s.KVSGet("foo")
|
result, err := s.KVSGet("foo")
|
||||||
if result != nil || err != nil {
|
if result != nil || err != nil {
|
||||||
t.Fatalf("expected (nil, nil), got : (%#v, %#v)", result, err)
|
t.Fatalf("expected (nil, nil), got : (%#v, %#v)", result, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write a new K/V entry to the store
|
// Write a new K/V entry to the store.
|
||||||
entry := &structs.DirEntry{
|
entry := &structs.DirEntry{
|
||||||
Key: "foo",
|
Key: "foo",
|
||||||
Value: []byte("bar"),
|
Value: []byte("bar"),
|
||||||
|
@ -1482,7 +1482,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the K/V entry again
|
// Retrieve the K/V entry again.
|
||||||
result, err = s.KVSGet("foo")
|
result, err = s.KVSGet("foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
@ -1491,17 +1491,22 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
||||||
t.Fatalf("expected k/v pair, got nothing")
|
t.Fatalf("expected k/v pair, got nothing")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the index was injected into the result
|
// Check that the index was injected into the result.
|
||||||
if result.CreateIndex != 1 || result.ModifyIndex != 1 {
|
if result.CreateIndex != 1 || result.ModifyIndex != 1 {
|
||||||
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the value matches
|
// Check that the value matches.
|
||||||
if v := string(result.Value); v != "bar" {
|
if v := string(result.Value); v != "bar" {
|
||||||
t.Fatalf("expected 'bar', got: '%s'", v)
|
t.Fatalf("expected 'bar', got: '%s'", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updating the entry works and changes the index
|
// Index was updated.
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 1 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updating the entry works and changes the index.
|
||||||
update := &structs.DirEntry{
|
update := &structs.DirEntry{
|
||||||
Key: "foo",
|
Key: "foo",
|
||||||
Value: []byte("baz"),
|
Value: []byte("baz"),
|
||||||
|
@ -1510,7 +1515,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the kv pair and check
|
// Fetch the kv pair and check.
|
||||||
result, err = s.KVSGet("foo")
|
result, err = s.KVSGet("foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
@ -1521,6 +1526,106 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
|
||||||
if v := string(result.Value); v != "baz" {
|
if v := string(result.Value); v != "baz" {
|
||||||
t.Fatalf("expected 'baz', got '%s'", v)
|
t.Fatalf("expected 'baz', got '%s'", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Index was updated.
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to set the session during an update.
|
||||||
|
update = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("zoo"),
|
||||||
|
Session: "nope",
|
||||||
|
}
|
||||||
|
if err := s.KVSSet(3, update); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the kv pair and check.
|
||||||
|
result, err = s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if result.CreateIndex != 1 || result.ModifyIndex != 3 {
|
||||||
|
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
||||||
|
}
|
||||||
|
if v := string(result.Value); v != "zoo" {
|
||||||
|
t.Fatalf("expected 'zoo', got '%s'", v)
|
||||||
|
}
|
||||||
|
if result.Session != "" {
|
||||||
|
t.Fatalf("expected empty session, got '%s", result.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index was updated.
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a real session and then lock the key to set the session.
|
||||||
|
testRegisterNode(t, s, 4, "node1")
|
||||||
|
if err := s.SessionCreate(5, &structs.Session{ID: "session1", Node: "node1"}); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
update = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("locked"),
|
||||||
|
Session: "session1",
|
||||||
|
}
|
||||||
|
ok, err := s.KVSLock(6, update)
|
||||||
|
if !ok || err != nil {
|
||||||
|
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the kv pair and check.
|
||||||
|
result, err = s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if result.CreateIndex != 1 || result.ModifyIndex != 6 {
|
||||||
|
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
||||||
|
}
|
||||||
|
if v := string(result.Value); v != "locked" {
|
||||||
|
t.Fatalf("expected 'zoo', got '%s'", v)
|
||||||
|
}
|
||||||
|
if result.Session != "session1" {
|
||||||
|
t.Fatalf("expected session, got '%s", result.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index was updated.
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now make an update without the session and make sure it gets applied
|
||||||
|
// and doesn't take away the session (it is allowed to change the value).
|
||||||
|
update = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("stoleit"),
|
||||||
|
}
|
||||||
|
if err := s.KVSSet(7, update); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the kv pair and check.
|
||||||
|
result, err = s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if result.CreateIndex != 1 || result.ModifyIndex != 7 {
|
||||||
|
t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex)
|
||||||
|
}
|
||||||
|
if v := string(result.Value); v != "stoleit" {
|
||||||
|
t.Fatalf("expected 'zoo', got '%s'", v)
|
||||||
|
}
|
||||||
|
if result.Session != "session1" {
|
||||||
|
t.Fatalf("expected session, got '%s", result.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index was updated.
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 7 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_KVSList(t *testing.T) {
|
func TestStateStore_KVSList(t *testing.T) {
|
||||||
|
@ -1771,11 +1876,13 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entry was inserted
|
// Entry was inserted
|
||||||
tx = s.db.Txn(false)
|
entry, err = s.KVSGet("foo")
|
||||||
if e, err := tx.First("kvs", "id", "foo"); e == nil || err != nil || string(e.(*structs.DirEntry).Value) != "foo" {
|
if err != nil {
|
||||||
t.Fatalf("expected kvs to exist, got: (%#v, %#v)", e, err)
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
|
||||||
|
t.Fatalf("bad entry: %#v", entry)
|
||||||
}
|
}
|
||||||
tx.Abort()
|
|
||||||
|
|
||||||
// Index was updated
|
// Index was updated
|
||||||
if idx := s.maxIndex("kvs"); idx != 2 {
|
if idx := s.maxIndex("kvs"); idx != 2 {
|
||||||
|
@ -1813,15 +1920,12 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entry was not updated in the store
|
// Entry was not updated in the store
|
||||||
tx = s.db.Txn(false)
|
entry, err = s.KVSGet("foo")
|
||||||
e, err := tx.First("kvs", "id", "foo")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
result, ok := e.(*structs.DirEntry)
|
if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 {
|
||||||
if !ok || result.CreateIndex != 2 ||
|
t.Fatalf("bad entry: %#v", entry)
|
||||||
result.ModifyIndex != 2 || string(result.Value) != "foo" {
|
|
||||||
t.Fatalf("bad: %#v", result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index was not modified
|
// Index was not modified
|
||||||
|
@ -1845,16 +1949,94 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entry was updated
|
// Entry was updated
|
||||||
tx = s.db.Txn(false)
|
entry, err = s.KVSGet("foo")
|
||||||
if e, err := tx.First("kvs", "id", "foo"); e == nil || err != nil || string(e.(*structs.DirEntry).Value) != "bar" {
|
if err != nil {
|
||||||
t.Fatalf("expected kvs to exist, got: (%#v, %#v)", e, err)
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 {
|
||||||
|
t.Fatalf("bad entry: %#v", entry)
|
||||||
}
|
}
|
||||||
tx.Abort()
|
|
||||||
|
|
||||||
// Index was updated
|
// Index was updated
|
||||||
if idx := s.maxIndex("kvs"); idx != 3 {
|
if idx := s.maxIndex("kvs"); idx != 3 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attempt to update the session during the CAS.
|
||||||
|
entry = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("zoo"),
|
||||||
|
Session: "nope",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 2,
|
||||||
|
ModifyIndex: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err = s.KVSSetCAS(4, entry)
|
||||||
|
if !ok || err != nil {
|
||||||
|
t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry was updated, but the session should have been ignored.
|
||||||
|
entry, err = s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 ||
|
||||||
|
entry.Session != "" {
|
||||||
|
t.Fatalf("bad entry: %#v", entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index was updated
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 4 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now lock it and try the update, which should keep the session.
|
||||||
|
testRegisterNode(t, s, 5, "node1")
|
||||||
|
if err := s.SessionCreate(6, &structs.Session{ID: "session1", Node: "node1"}); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
entry = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("locked"),
|
||||||
|
Session: "session1",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 2,
|
||||||
|
ModifyIndex: 4,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err = s.KVSLock(6, entry)
|
||||||
|
if !ok || err != nil {
|
||||||
|
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||||
|
}
|
||||||
|
entry = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("locked"),
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 2,
|
||||||
|
ModifyIndex: 6,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err = s.KVSSetCAS(7, entry)
|
||||||
|
if !ok || err != nil {
|
||||||
|
t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry was updated, and the lock status should have stayed the same.
|
||||||
|
entry, err = s.KVSGet("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 ||
|
||||||
|
entry.Session != "session1" {
|
||||||
|
t.Fatalf("bad entry: %#v", entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index was updated
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 7 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_KVSDeleteTree(t *testing.T) {
|
func TestStateStore_KVSDeleteTree(t *testing.T) {
|
||||||
|
@ -1922,13 +2104,13 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
// Lock with no session should fail.
|
// Lock with no session should fail.
|
||||||
ok, err := s.KVSLock(0, &structs.DirEntry{Key: "foo"})
|
ok, err := s.KVSLock(0, &structs.DirEntry{Key: "foo", Value: []byte("foo")})
|
||||||
if ok || err == nil || !strings.Contains(err.Error(), "missing session") {
|
if ok || err == nil || !strings.Contains(err.Error(), "missing session") {
|
||||||
t.Fatalf("didn't detect missing session: %v %s", ok, err)
|
t.Fatalf("didn't detect missing session: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now try with a bogus session.
|
// Now try with a bogus session.
|
||||||
ok, err = s.KVSLock(1, &structs.DirEntry{Key: "foo", Session: "nope"})
|
ok, err = s.KVSLock(1, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: "nope"})
|
||||||
if ok || err == nil || !strings.Contains(err.Error(), "invalid session") {
|
if ok || err == nil || !strings.Contains(err.Error(), "invalid session") {
|
||||||
t.Fatalf("didn't detect invalid session: %v %s", ok, err)
|
t.Fatalf("didn't detect invalid session: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -1940,7 +2122,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock and make the key at the same time.
|
// Lock and make the key at the same time.
|
||||||
ok, err = s.KVSLock(4, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSLock(4, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: "session1"})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -1950,32 +2132,41 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 4 {
|
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 4 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "foo" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 4 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-locking should not return an error, but will report that it didn't
|
// Re-locking with the same session should update the value and report
|
||||||
// get the lock.
|
// success.
|
||||||
ok, err = s.KVSLock(5, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSLock(5, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: "session1"})
|
||||||
if ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err)
|
t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure the indexes didn't update.
|
// Make sure the indexes got set properly, note that the lock index
|
||||||
|
// won't go up since we didn't lock it again.
|
||||||
result, err = s.KVSGet("foo")
|
result, err = s.KVSGet("foo")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 4 {
|
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 5 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "bar" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock and the re-lock.
|
// Unlock and the re-lock.
|
||||||
ok, err = s.KVSUnlock(6, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSUnlock(6, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: "session1"})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't handle unlocking a locked key: %v %s", ok, err)
|
t.Fatalf("didn't handle unlocking a locked key: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
ok, err = s.KVSLock(7, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSLock(7, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: "session1"})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -1985,13 +2176,17 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 2 || result.CreateIndex != 4 || result.ModifyIndex != 7 {
|
if result.LockIndex != 2 || result.CreateIndex != 4 || result.ModifyIndex != 7 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "zoo" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 7 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock an existing key.
|
// Lock an existing key.
|
||||||
testSetKey(t, s, 8, "bar", "bar")
|
testSetKey(t, s, 8, "bar", "bar")
|
||||||
ok, err = s.KVSLock(9, &structs.DirEntry{Key: "bar", Session: "session1"})
|
ok, err = s.KVSLock(9, &structs.DirEntry{Key: "bar", Value: []byte("xxx"), Session: "session1"})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2001,8 +2196,12 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 {
|
if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "xxx" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 9 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempting a re-lock with a different session should also fail.
|
// Attempting a re-lock with a different session should also fail.
|
||||||
|
@ -2012,7 +2211,7 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
|
|
||||||
// Re-locking should not return an error, but will report that it didn't
|
// Re-locking should not return an error, but will report that it didn't
|
||||||
// get the lock.
|
// get the lock.
|
||||||
ok, err = s.KVSLock(11, &structs.DirEntry{Key: "bar", Session: "session2"})
|
ok, err = s.KVSLock(11, &structs.DirEntry{Key: "bar", Value: []byte("nope"), Session: "session2"})
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err)
|
t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2022,8 +2221,12 @@ func TestStateStore_KVSLock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 {
|
if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "xxx" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 9 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2031,7 +2234,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
// Unlock with no session should fail.
|
// Unlock with no session should fail.
|
||||||
ok, err := s.KVSUnlock(0, &structs.DirEntry{Key: "foo"})
|
ok, err := s.KVSUnlock(0, &structs.DirEntry{Key: "foo", Value: []byte("bar")})
|
||||||
if ok || err == nil || !strings.Contains(err.Error(), "missing session") {
|
if ok || err == nil || !strings.Contains(err.Error(), "missing session") {
|
||||||
t.Fatalf("didn't detect missing session: %v %s", ok, err)
|
t.Fatalf("didn't detect missing session: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2044,14 +2247,14 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
|
|
||||||
// Unlock with a real session but no key should not return an error, but
|
// Unlock with a real session but no key should not return an error, but
|
||||||
// will report it didn't unlock anything.
|
// will report it didn't unlock anything.
|
||||||
ok, err = s.KVSUnlock(3, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSUnlock(3, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: "session1"})
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
t.Fatalf("didn't handle unlocking a missing key: %v %s", ok, err)
|
t.Fatalf("didn't handle unlocking a missing key: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a key and unlock it, without it being locked.
|
// Make a key and unlock it, without it being locked.
|
||||||
testSetKey(t, s, 4, "foo", "foo")
|
testSetKey(t, s, 4, "foo", "bar")
|
||||||
ok, err = s.KVSUnlock(5, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSUnlock(5, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: "session1"})
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
t.Fatalf("didn't handle unlocking a non-locked key: %v %s", ok, err)
|
t.Fatalf("didn't handle unlocking a non-locked key: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2061,12 +2264,16 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 0 || result.CreateIndex != 4 || result.ModifyIndex != 4 {
|
if result.LockIndex != 0 || result.CreateIndex != 4 || result.ModifyIndex != 4 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "bar" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 4 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock it with the first session.
|
// Lock it with the first session.
|
||||||
ok, err = s.KVSLock(6, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSLock(6, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: "session1"})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2075,7 +2282,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
if err := s.SessionCreate(7, &structs.Session{ID: "session2", Node: "node1"}); err != nil {
|
if err := s.SessionCreate(7, &structs.Session{ID: "session2", Node: "node1"}); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
ok, err = s.KVSUnlock(8, &structs.DirEntry{Key: "foo", Session: "session2"})
|
ok, err = s.KVSUnlock(8, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: "session2"})
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
t.Fatalf("didn't handle unlocking with the wrong session: %v %s", ok, err)
|
t.Fatalf("didn't handle unlocking with the wrong session: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2085,12 +2292,16 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 6 {
|
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 6 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "bar" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now do the unlock with the correct session.
|
// Now do the unlock with the correct session.
|
||||||
ok, err = s.KVSUnlock(9, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSUnlock(9, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: "session1"})
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
t.Fatalf("didn't handle unlocking with the correct session: %v %s", ok, err)
|
t.Fatalf("didn't handle unlocking with the correct session: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2100,12 +2311,16 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 {
|
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "zoo" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 9 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlocking again should fail and not change anything.
|
// Unlocking again should fail and not change anything.
|
||||||
ok, err = s.KVSUnlock(10, &structs.DirEntry{Key: "foo", Session: "session1"})
|
ok, err = s.KVSUnlock(10, &structs.DirEntry{Key: "foo", Value: []byte("nope"), Session: "session1"})
|
||||||
if ok || err != nil {
|
if ok || err != nil {
|
||||||
t.Fatalf("didn't handle unlocking with the previous session: %v %s", ok, err)
|
t.Fatalf("didn't handle unlocking with the previous session: %v %s", ok, err)
|
||||||
}
|
}
|
||||||
|
@ -2115,8 +2330,12 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 {
|
if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 ||
|
||||||
t.Fatalf("bad index: %d, %d, %d", result.LockIndex, result.CreateIndex, result.ModifyIndex)
|
string(result.Value) != "zoo" {
|
||||||
|
t.Fatalf("bad entry: %#v", result)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("kvs"); idx != 9 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue