consul/state: adding shallow delete for kvs store

This commit is contained in:
Ryan Uber 2015-09-01 16:50:28 -07:00 committed by James Phillips
parent 08d41224a3
commit a0fd9feda3
2 changed files with 92 additions and 3 deletions

View File

@ -719,7 +719,7 @@ func (s *StateStore) kvsSetTxn(
// Retrieve an existing KV pair
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return fmt.Errorf("failed key lookup: %s", err)
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Set the indexes
@ -733,7 +733,7 @@ func (s *StateStore) kvsSetTxn(
// Store the kv pair in the state store and update the index
if err := tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kv entry: %s", err)
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
@ -750,7 +750,41 @@ func (s *StateStore) KVSGet(key string) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed key lookup: %s", err)
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}
return entry.(*structs.DirEntry), nil
}
// KVSDelete is used to perform a shallow delete on a single key in the
// the state store.
func (s *StateStore) KVSDelete(idx uint64, key string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Perform the actual delete
if err := s.kvsDeleteTxn(idx, key, tx); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction.
func (s *StateStore) kvsDeleteTxn(idx uint64, key string, tx *memdb.Txn) error {
// Look up the entry in the state store
entry, err := tx.First("kvs", "id", key)
if err != nil {
return fmt.Errorf("failed key lookup: %s", err)
}
// Delete the entry and update the index
if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}

View File

@ -82,6 +82,23 @@ func testRegisterCheck(t *testing.T, s *StateStore, idx uint64,
}
}
func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
if err := s.KVSSet(idx, entry); err != nil {
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
defer tx.Abort()
e, err := tx.First("kvs", "id", key)
if err != nil {
t.Fatalf("err: %s", err)
}
if result, ok := e.(*structs.DirEntry); !ok || result.Key != key {
t.Fatalf("bad kvs entry: %#v", result)
}
}
func TestStateStore_EnsureNode(t *testing.T) {
s := testStateStore(t)
@ -828,3 +845,41 @@ func TestStateStore_KVSSet(t *testing.T) {
t.Fatalf("expected 'baz', got '%s'", v)
}
}
func TestStateStore_KVSDelete(t *testing.T) {
s := testStateStore(t)
// Create some KV pairs
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
// Call a delete on a specific key
if err := s.KVSDelete(3, "foo"); err != nil {
t.Fatalf("err: %s", err)
}
// The entry was removed from the state store
tx := s.db.Txn(false)
defer tx.Abort()
e, err := tx.First("kvs", "id", "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if e != nil {
t.Fatalf("expected kvs entry to be deleted, got: %#v", e)
}
// Try fetching the other keys to ensure they still exist
e, err = tx.First("kvs", "id", "foo/bar")
if err != nil {
t.Fatalf("err: %s", err)
}
if e == nil || string(e.(*structs.DirEntry).Value) != "bar" {
t.Fatalf("bad kvs entry: %#v", e)
}
// Check that the index table was updated
if idx := s.maxIndex("kvs"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
}