From b0856c221584c6157e09e7a45ba696aeac63fe5e Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 2 Sep 2015 20:25:52 -0700 Subject: [PATCH] consul/state: adding tree delete for kvs store --- consul/state/state_store.go | 35 ++++++++++++++++++++++ consul/state/state_store_test.go | 50 ++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 6a1e5ce26f..5973558c80 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -930,3 +930,38 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error // If we made it this far, we should perform the set. return true, s.kvsSetTxn(idx, entry, tx) } + +// KVSDeleteTree is used to do a recursive delete on a key prefix +// in the state store. If any keys are modified, the last index is +// set, otherwise this is a no-op. +func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get an iterator over all of the keys with the given prefix + entries, err := tx.Get("kvs", "id_prefix", prefix) + if err != nil { + return fmt.Errorf("failed kvs lookup: %s", err) + } + + // Go over all of the keys and remove them. We call the delete + // directly so that we only update the index once. + var modified bool + for entry := entries.Next(); entry != nil; entry = entries.Next() { + err := tx.Delete("kvs", entry.(*structs.DirEntry)) + if err != nil { + return fmt.Errorf("failed deleting kvs entry: %s", err) + } + modified = true + } + + // Update the index + if modified { + if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } + + tx.Commit() + return nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index d39b09efc0..fa49662653 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -1114,3 +1114,53 @@ func TestStateStore_KVSSetCAS(t *testing.T) { t.Fatalf("bad index: %d", idx) } } + +func TestStateStore_KVSDeleteTree(t *testing.T) { + s := testStateStore(t) + + // Create kvs entries in the state store + testSetKey(t, s, 1, "foo/bar", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "baz") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/zorp", "zorp") + + // Calling tree deletion which affects nothing does not + // modify the table index. + if err := s.KVSDeleteTree(9, "bar"); err != nil { + t.Fatalf("err: %s", err) + } + if idx := s.maxIndex("kvs"); idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Call tree deletion with a nested prefix. + if err := s.KVSDeleteTree(5, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + + // Check that all the matching keys were deleted + tx := s.db.Txn(false) + defer tx.Abort() + + entries, err := tx.Get("kvs", "id") + if err != nil { + t.Fatalf("err: %s", err) + } + + num := 0 + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if entry.(*structs.DirEntry).Key != "foo/zorp" { + t.Fatalf("unexpected kvs entry: %#v", entry) + } + num++ + } + + if num != 1 { + t.Fatalf("expected 1 key, got: %d", num) + } + + // Index should be updated if modifications are made + if idx := s.maxIndex("kvs"); idx != 5 { + t.Fatalf("bad index: %d", idx) + } +}