Converts KVS snapshot over to iterator.

This commit is contained in:
James Phillips 2015-10-19 14:07:57 -07:00
parent 503c552d28
commit 8371c87fd0
3 changed files with 14 additions and 15 deletions

View File

@ -378,7 +378,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
return err
}
if err := s.persistKV(sink, encoder); err != nil {
if err := s.persistKVs(sink, encoder); err != nil {
sink.Cancel()
return err
}
@ -475,16 +475,16 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
return nil
}
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
entries, err := s.state.KVSDump()
iter, err := s.state.KVs()
if err != nil {
return err
}
for _, e := range entries {
for ki := iter.Next(); ki != nil; ki = iter.Next() {
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(e); err != nil {
if err := encoder.Encode(ki.(*structs.DirEntry)); err != nil {
return err
}
}

View File

@ -157,17 +157,12 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
}
// KVSDump is used to pull the full list of KVS entries for use during snapshots.
func (s *StateSnapshot) KVSDump() (structs.DirEntries, error) {
entries, err := s.tx.Get("kvs", "id_prefix")
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("kvs", "id_prefix")
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
return nil, err
}
var dump structs.DirEntries
for entry := entries.Next(); entry != nil; entry = entries.Next() {
dump = append(dump, entry.(*structs.DirEntry))
}
return dump, nil
return iter, nil
}
// TombstoneDump is used to pull all the tombstones from the graveyard.

View File

@ -3065,10 +3065,14 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
if idx := snap.LastIndex(); idx != 7 {
t.Fatalf("bad index: %d", idx)
}
dump, err := snap.KVSDump()
iter, err := snap.KVs()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.DirEntries
for ki := iter.Next(); ki != nil; ki = iter.Next() {
dump = append(dump, ki.(*structs.DirEntry))
}
if !reflect.DeepEqual(dump, entries) {
t.Fatalf("bad: %#v", dump)
}