mirror of https://github.com/status-im/consul.git
consul/state: initial pass at CAS delete operation for kvs
This commit is contained in:
parent
4ba89adb7d
commit
8a70ba2cc5
|
@ -752,7 +752,10 @@ func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
return entry.(*structs.DirEntry), nil
|
||||
if entry != nil {
|
||||
return entry.(*structs.DirEntry), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// KVSList is used to list out all keys under a given prefix. If the
|
||||
|
@ -802,7 +805,7 @@ func (s *StateStore) kvsDeleteTxn(idx uint64, key string, tx *memdb.Txn) error {
|
|||
// Look up the entry in the state store
|
||||
entry, err := tx.First("kvs", "id", key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed key lookup: %s", err)
|
||||
return fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
|
||||
// Delete the entry and update the index
|
||||
|
@ -814,3 +817,34 @@ func (s *StateStore) kvsDeleteTxn(idx uint64, key string, tx *memdb.Txn) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// KVSDeleteCAS is used to try doing a KV delete operation with a given
|
||||
// raft index. If the CAS index specified is not equal to the last
|
||||
// observed index for the given key, then the call is a noop, otherwise
|
||||
// a normal KV delete is invoked.
|
||||
func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Retrieve the existing kvs entry, if any exists
|
||||
entry, err := tx.First("kvs", "id", key)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
|
||||
// If the existing index does not match the provided CAS
|
||||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
e, ok := entry.(*structs.DirEntry)
|
||||
if !ok || e.ModifyIndex != cidx {
|
||||
return entry == nil, nil
|
||||
}
|
||||
|
||||
// Call the actual deletion if the above passed
|
||||
if err := s.kvsDeleteTxn(idx, key, tx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -939,3 +939,50 @@ func TestStateStore_KVSList(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", keys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVSDeleteCAS(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create some KV entries
|
||||
testSetKey(t, s, 1, "foo", "foo")
|
||||
testSetKey(t, s, 2, "bar", "bar")
|
||||
testSetKey(t, s, 3, "baz", "baz")
|
||||
|
||||
// Do a CAS delete with an index lower than the entry
|
||||
ok, err := s.KVSDeleteCAS(4, 1, "bar")
|
||||
if ok || err != nil {
|
||||
t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
|
||||
}
|
||||
|
||||
// Check that the index is untouched and the entry
|
||||
// has not been deleted.
|
||||
if idx := s.maxIndex("kvs"); idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
e, err := s.KVSGet("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if e == nil {
|
||||
t.Fatalf("expected a kvs entry, got nil")
|
||||
}
|
||||
|
||||
// Do another CAS delete, this time with the correct index
|
||||
// which should cause the delete to take place.
|
||||
ok, err = s.KVSDeleteCAS(4, 2, "bar")
|
||||
if !ok || err != nil {
|
||||
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
|
||||
}
|
||||
|
||||
// Entry was deleted and index was updated
|
||||
if idx := s.maxIndex("kvs"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
e, err = s.KVSGet("bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if e != nil {
|
||||
t.Fatalf("entry should be deleted")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue