From e7abf17db457fd0547dcfeb5d7d0ad7df100834a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:37:45 -0800 Subject: [PATCH] consul: List Keys should handle tombstones --- consul/state_store.go | 21 +++++++++++-- consul/state_store_test.go | 62 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 32d4b889a8..feeec3ca03 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1152,7 +1152,8 @@ func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, // KVSListKeys is used to list keys with a prefix, and up to a given seperator func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { - tx, err := s.kvsTable.StartTxn(true, nil) + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) if err != nil { return 0, nil, err } @@ -1172,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er // Aggregate the stream stream := make(chan interface{}, 128) + streamTomb := make(chan interface{}, 128) done := make(chan struct{}) var keys []string var maxIndex uint64 @@ -1205,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er keys = append(keys, ent.Key) } } + + // Handle the tombstones for any index updates + for raw := range streamTomb { + ent := raw.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } close(done) }() // Start the stream, and wait for completion - err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) + if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } + if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } <-done // Use the maxIndex if we have any keys if maxIndex != 0 { idx = maxIndex } - return idx, keys, err + return idx, keys, nil } // KVSDelete is used to delete a KVS entry diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 54d9ffa42c..0ad29c3704 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1859,6 +1859,68 @@ func TestKVS_ListKeys_Index(t *testing.T) { } } +func TestKVS_ListKeys_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1003, d); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.KVSDelete(1004, "/baz/c"); err != nil { + t.Fatalf("err: %v", err) + } + + idx, keys, err := store.KVSListKeys("/foo", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/ba", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/nope", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 0 { + t.Fatalf("bad: %v", keys) + } +} + func TestKVSDeleteTree(t *testing.T) { store, err := testStateStore() if err != nil {