mirror of https://github.com/status-im/consul.git
Use new DeletePrefixMethod for implementing KVSDeleteTree operation. This makes deletes on sub trees larger than one million nodes about 100 times faster. Added unit tests.
This commit is contained in:
parent
2cd7408c06
commit
36acf8d6a4
|
@ -420,36 +420,21 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
|
|||
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
|
||||
// existing transaction.
|
||||
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
|
||||
// Get an iterator over all of the keys with the given prefix.
|
||||
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
||||
|
||||
// For prefix deletes, only insert one tombstone and delete the entire subtree
|
||||
|
||||
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed kvs lookup: %s", err)
|
||||
return fmt.Errorf("failed recursive deleting kvs entry: %s", err)
|
||||
}
|
||||
|
||||
// Go over all of the keys and remove them. We call the delete
|
||||
// directly so that we only update the index once. We also add
|
||||
// tombstones as we go.
|
||||
var modified bool
|
||||
var objs []interface{}
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
e := entry.(*structs.DirEntry)
|
||||
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
|
||||
// Update the index if the delete was successful.
|
||||
// Missing prefixes don't result in an index update
|
||||
if deleted {
|
||||
if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil {
|
||||
return fmt.Errorf("failed adding to graveyard: %s", err)
|
||||
}
|
||||
objs = append(objs, entry)
|
||||
modified = true
|
||||
}
|
||||
|
||||
// Do the actual deletes in a separate loop so we don't trash the
|
||||
// iterator as we go.
|
||||
for _, obj := range objs {
|
||||
if err := tx.Delete("kvs", obj); err != nil {
|
||||
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the index
|
||||
if modified {
|
||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
@ -1022,6 +1024,163 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Watches_PrefixDelete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create some KVS entries
|
||||
testSetKey(t, s, 1, "foo", "foo")
|
||||
testSetKey(t, s, 2, "foo/bar", "bar")
|
||||
testSetKey(t, s, 3, "foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
|
||||
testSetKey(t, s, 5, "foo/bar/baz", "baz")
|
||||
|
||||
// Delete a key and make sure the index comes from the tombstone.
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, _, err := s.KVSList(ws, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
if err := s.KVSDeleteTree(6, "foo/bar"); err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("expected watch to fire but it did not")
|
||||
}
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, _, err = s.KVSList(ws, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d, expected %d", idx, 6)
|
||||
}
|
||||
|
||||
// Set a different key to bump the index. This shouldn't fire the
|
||||
// watch since there's a different prefix.
|
||||
testSetKey(t, s, 7, "some/other/key", "")
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Make sure we get the right index from the tombstone for the prefix
|
||||
idx, _, err = s.KVSList(nil, "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d, expected %v", idx, 7)
|
||||
}
|
||||
|
||||
// Now ask for the index for a node within the prefix that was deleted
|
||||
// We expect to get the max index in the tree because the tombstone contains the parent foo/bar
|
||||
idx, _, err = s.KVSList(nil, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d, expected %v", idx, 7)
|
||||
}
|
||||
|
||||
// Now reap the tombstones and make sure we get the latest index
|
||||
// since there are no matching keys.
|
||||
if err := s.ReapTombstones(6); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList(nil, "foo/bar/baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// List all the keys to make sure the index is also correct.
|
||||
idx, _, err = s.KVSList(nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVSDeleteTreePrefix(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create kvs entries in the state store.
|
||||
for i := 0; i < 120; i++ {
|
||||
ind := uint64(i + 1)
|
||||
key := "foo/bar" + fmt.Sprintf("%d", ind)
|
||||
testSetKey(t, s, ind, key, "bar")
|
||||
}
|
||||
testSetKey(t, s, 121, "foo/zorp", "zorp")
|
||||
|
||||
// Calling tree deletion which affects nothing does not
|
||||
// modify the table index.
|
||||
if err := s.KVSDeleteTree(129, "bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx := s.maxIndex("kvs"); idx != 121 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Call tree deletion with a nested prefix.
|
||||
if err := s.KVSDeleteTree(122, "foo/bar"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Check that all the matching keys were deleted
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
entries, err := tx.Get("kvs", "id")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
num := 0
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
if entry.(*structs.DirEntry).Key != "foo/zorp" {
|
||||
t.Fatalf("unexpected kvs entry: %#v", entry)
|
||||
}
|
||||
num++
|
||||
}
|
||||
|
||||
if num != 1 {
|
||||
t.Fatalf("expected 1 key, got: %d", num)
|
||||
}
|
||||
|
||||
// Index should be updated if modifications are made
|
||||
if idx := s.maxIndex("kvs"); idx != 122 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Check that the tombstones ware created and that prevents the index
|
||||
// from sliding backwards.
|
||||
idx, _, err := s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 122 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Now reap the tombstones and watch the index revert to the remaining
|
||||
// foo/zorp key's index.
|
||||
if err := s.ReapTombstones(122); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
idx, _, err = s.KVSList(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 121 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVSLockDelay(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -68,6 +68,66 @@ func TestKeyWatch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKeyWatch_With_PrefixDelete(t *testing.T) {
|
||||
if consulAddr == "" {
|
||||
t.Skip()
|
||||
}
|
||||
plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`)
|
||||
invoke := 0
|
||||
deletedKeyWatchInvoked := 0
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil && deletedKeyWatchInvoked == 0 {
|
||||
deletedKeyWatchInvoked++
|
||||
return
|
||||
}
|
||||
if invoke == 0 {
|
||||
v, ok := raw.(*consulapi.KVPair)
|
||||
if !ok || v == nil || string(v.Value) != "test" {
|
||||
t.Fatalf("Bad: %#v", raw)
|
||||
}
|
||||
invoke++
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer plan.Stop()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
kv := plan.client.KV()
|
||||
pair := &consulapi.KVPair{
|
||||
Key: "foo/bar/baz",
|
||||
Value: []byte("test"),
|
||||
}
|
||||
_, err := kv.Put(pair, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the query to run
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Delete the key
|
||||
_, err = kv.DeleteTree("foo/bar", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
plan.Stop()
|
||||
}()
|
||||
|
||||
err := plan.Run(consulAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if invoke != 1 {
|
||||
t.Fatalf("expected watch plan to be invoked once but got %v", invoke)
|
||||
}
|
||||
|
||||
if deletedKeyWatchInvoked != 1 {
|
||||
t.Fatalf("expected watch plan to be invoked once on delete but got %v", deletedKeyWatchInvoked)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeyPrefixWatch(t *testing.T) {
|
||||
if consulAddr == "" {
|
||||
t.Skip()
|
||||
|
|
Loading…
Reference in New Issue