consul: List Keys should handle tombstones

This commit is contained in:
Armon Dadgar 2014-12-18 16:37:45 -08:00
parent b4292640a5
commit e7abf17db4
2 changed files with 80 additions and 3 deletions

View File

@ -1152,7 +1152,8 @@ func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries,
// KVSListKeys is used to list keys with a prefix, and up to a given seperator // KVSListKeys is used to list keys with a prefix, and up to a given seperator
func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) {
tx, err := s.kvsTable.StartTxn(true, nil) tables := MDBTables{s.kvsTable, s.tombstoneTable}
tx, err := tables.StartTxn(true)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -1172,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
// Aggregate the stream // Aggregate the stream
stream := make(chan interface{}, 128) stream := make(chan interface{}, 128)
streamTomb := make(chan interface{}, 128)
done := make(chan struct{}) done := make(chan struct{})
var keys []string var keys []string
var maxIndex uint64 var maxIndex uint64
@ -1205,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
keys = append(keys, ent.Key) keys = append(keys, ent.Key)
} }
} }
// Handle the tombstones for any index updates
for raw := range streamTomb {
ent := raw.(*structs.DirEntry)
if ent.ModifyIndex > maxIndex {
maxIndex = ent.ModifyIndex
}
}
close(done) close(done)
}() }()
// Start the stream, and wait for completion // Start the stream, and wait for completion
err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil {
return 0, nil, err
}
if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil {
return 0, nil, err
}
<-done <-done
// Use the maxIndex if we have any keys // Use the maxIndex if we have any keys
if maxIndex != 0 { if maxIndex != 0 {
idx = maxIndex idx = maxIndex
} }
return idx, keys, err return idx, keys, nil
} }
// KVSDelete is used to delete a KVS entry // KVSDelete is used to delete a KVS entry

View File

@ -1859,6 +1859,68 @@ func TestKVS_ListKeys_Index(t *testing.T) {
} }
} }
func TestKVS_ListKeys_TombstoneIndex(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create the entries
d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1003, d); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.KVSDelete(1004, "/baz/c"); err != nil {
t.Fatalf("err: %v", err)
}
idx, keys, err := store.KVSListKeys("/foo", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1000 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 1 {
t.Fatalf("bad: %v", keys)
}
idx, keys, err = store.KVSListKeys("/ba", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1004 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 1 {
t.Fatalf("bad: %v", keys)
}
idx, keys, err = store.KVSListKeys("/nope", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1004 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 0 {
t.Fatalf("bad: %v", keys)
}
}
func TestKVSDeleteTree(t *testing.T) { func TestKVSDeleteTree(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {