From b4292640a5807cdb4ded60712a8c9c04a2a30380 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:27:46 -0800 Subject: [PATCH] consul: Ensure KVS List handles tombstones --- consul/kvs_endpoint.go | 42 ++++++++++++++------------- consul/state_store.go | 32 +++++++++++++++++++-- consul/state_store_test.go | 58 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 107 insertions(+), 25 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 53ed238be1..622cb110a5 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -135,32 +135,36 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e &reply.QueryMeta, state.QueryTables("KVSList"), func() error { - index, ent, err := state.KVSList(args.Key) + tombIndex, index, ent, err := state.KVSList(args.Key) if err != nil { return err } if acl != nil { ent = FilterDirEnt(acl, ent) } - if len(ent) == 0 { - // Must provide non-zero index to prevent blocking - // Index 1 is impossible anyways (due to Raft internals) - if index == 0 { - reply.Index = 1 - } else { - reply.Index = index - } - reply.Entries = nil - } else { - // Determine the maximum affected index - var maxIndex uint64 - for _, e := range ent { - if e.ModifyIndex > maxIndex { - maxIndex = e.ModifyIndex - } - } - reply.Index = maxIndex + // Determine the maximum affected index + var maxIndex uint64 + for _, e := range ent { + if e.ModifyIndex > maxIndex { + maxIndex = e.ModifyIndex + } + } + if tombIndex > maxIndex { + maxIndex = tombIndex + } + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if maxIndex == 0 { + if index > 0 { + maxIndex = index + } else { + maxIndex = 1 + } + } + reply.Index = maxIndex + + if len(ent) != 0 { reply.Entries = ent } return nil diff --git a/consul/state_store.go b/consul/state_store.go index a82d184b87..32d4b889a8 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1115,13 +1115,39 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { } // KVSList is used to list all KV entries with a prefix -func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { - idx, res, err := s.kvsTable.Get("id_prefix", prefix) +func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) { + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) + if err != nil { + return 0, 0, nil, err + } + defer tx.Abort() + + idx, err := tables.LastIndexTxn(tx) + if err != nil { + return 0, 0, nil, err + } + + res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix) + if err != nil { + return 0, 0, nil, err + } ents := make(structs.DirEntries, len(res)) for idx, r := range res { ents[idx] = r.(*structs.DirEntry) } - return idx, ents, err + + // Check for the higest index in the tombstone table + var maxIndex uint64 + res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix) + for _, r := range res { + ent := r.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } + + return maxIndex, idx, ents, err } // KVSListKeys is used to list keys with a prefix, and up to a given seperator diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 9ba723d114..54d9ffa42c 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1588,7 +1588,7 @@ func TestKVS_List(t *testing.T) { defer store.Close() // Should not exist - idx, ents, err := store.KVSList("/web") + _, idx, ents, err := store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } @@ -1614,7 +1614,7 @@ func TestKVS_List(t *testing.T) { } // Should list - idx, ents, err = store.KVSList("/web") + _, idx, ents, err = store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } @@ -1636,6 +1636,55 @@ func TestKVS_List(t *testing.T) { } } +func TestKVSList_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/c", Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the last node + err = store.KVSDeleteTree(1003, "/web/c") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Add another node + d = &structs.DirEntry{Key: "/other", Value: []byte("test")} + if err := store.KVSSet(1004, d); err != nil { + t.Fatalf("err: %v", err) + } + + // List should properly reflect tombstoned value + tombIdx, idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if tombIdx != 1003 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 2 { + t.Fatalf("bad: %v", ents) + } +} + func TestKVS_ListKeys(t *testing.T) { store, err := testStateStore() if err != nil { @@ -1852,13 +1901,16 @@ func TestKVSDeleteTree(t *testing.T) { } // Nothing should list - idx, ents, err := store.KVSList("/web") + tombIdx, idx, ents, err := store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } if idx != 1010 { t.Fatalf("bad: %v", idx) } + if tombIdx != 1010 { + t.Fatalf("bad: %v", idx) + } if len(ents) != 0 { t.Fatalf("bad: %v", ents) }