diff --git a/consul/fsm.go b/consul/fsm.go index f8451913f2..5e8c980e2c 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -290,11 +290,30 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { nodes = nil // Dump the KVS entries - dirents := s.state.KVSDump() - for _, ent := range dirents { - // Register the node itself - sink.Write([]byte{byte(structs.KVSRequestType)}) - if err := encoder.Encode(ent); err != nil { + streamCh := make(chan interface{}, 256) + errorCh := make(chan error) + go func() { + if err := s.state.KVSDump(streamCh); err != nil { + errorCh <- err + } + }() + +OUTER: + for { + select { + case raw := <-streamCh: + if raw == nil { + break OUTER + } + ent := raw.(*structs.DirEntry) + + sink.Write([]byte{byte(structs.KVSRequestType)}) + if err := encoder.Encode(ent); err != nil { + sink.Cancel() + return err + } + + case err := <-errorCh: sink.Cancel() return err } diff --git a/consul/state_store.go b/consul/state_store.go index be18995a15..4b08c04640 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -923,16 +923,9 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { return checks } -// KVSDump is used to list all KV entries -func (s *StateSnapshot) KVSDump() structs.DirEntries { - res, err := s.store.kvsTable.GetTxn(s.tx, "id") - if err != nil { - s.store.logger.Printf("[ERR] consul.state: Failed to get KVS entries: %v", err) - return nil - } - ents := make(structs.DirEntries, len(res)) - for idx, r := range res { - ents[idx] = r.(*structs.DirEntry) - } - return ents +// KVSDump is used to list all KV entries. It takes a channel and streams +// back *struct.DirEntry objects. This will block and should be invoked +// in a goroutine. +func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { + return s.store.kvsTable.StreamTxn(stream, s.tx, "id") } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index e03e0ad76d..5e5bf3e8e1 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -602,7 +602,23 @@ func TestStoreSnapshot(t *testing.T) { } // Check we have the entries - ents := snap.KVSDump() + streamCh := make(chan interface{}, 64) + doneCh := make(chan struct{}) + var ents []*structs.DirEntry + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.KVSDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh if len(ents) != 2 { t.Fatalf("missing KVS entries!") } @@ -661,7 +677,23 @@ func TestStoreSnapshot(t *testing.T) { } // Check we have the entries - ents = snap.KVSDump() + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.KVSDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh if len(ents) != 2 { t.Fatalf("missing KVS entries!") }