consul/state: adding tree delete for kvs store

This commit is contained in:
Ryan Uber 2015-09-02 20:25:52 -07:00 committed by James Phillips
parent 291fbe02ba
commit b0856c2215
2 changed files with 85 additions and 0 deletions

View File

@ -930,3 +930,38 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
// If we made it this far, we should perform the set.
return true, s.kvsSetTxn(idx, entry, tx)
}
// KVSDeleteTree is used to do a recursive delete on a key prefix
// in the state store. If any keys are modified, the last index is
// set, otherwise this is a no-op.
func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Get an iterator over all of the keys with the given prefix
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Go over all of the keys and remove them. We call the delete
// directly so that we only update the index once.
var modified bool
for entry := entries.Next(); entry != nil; entry = entries.Next() {
err := tx.Delete("kvs", entry.(*structs.DirEntry))
if err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
modified = true
}
// Update the index
if modified {
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
tx.Commit()
return nil
}

View File

@ -1114,3 +1114,53 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_KVSDeleteTree(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store
testSetKey(t, s, 1, "foo/bar", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
// Calling tree deletion which affects nothing does not
// modify the table index.
if err := s.KVSDeleteTree(9, "bar"); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("kvs"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Call tree deletion with a nested prefix.
if err := s.KVSDeleteTree(5, "foo/bar"); err != nil {
t.Fatalf("err: %s", err)
}
// Check that all the matching keys were deleted
tx := s.db.Txn(false)
defer tx.Abort()
entries, err := tx.Get("kvs", "id")
if err != nil {
t.Fatalf("err: %s", err)
}
num := 0
for entry := entries.Next(); entry != nil; entry = entries.Next() {
if entry.(*structs.DirEntry).Key != "foo/zorp" {
t.Fatalf("unexpected kvs entry: %#v", entry)
}
num++
}
if num != 1 {
t.Fatalf("expected 1 key, got: %d", num)
}
// Index should be updated if modifications are made
if idx := s.maxIndex("kvs"); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}