Converts sessions and ACLs over to iterators.

This commit is contained in:
James Phillips 2015-10-19 14:56:22 -07:00
parent 8371c87fd0
commit ffe531c55f
7 changed files with 85 additions and 58 deletions

View File

@ -445,14 +445,14 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
sessions, err := s.state.SessionDump()
iter, err := s.state.Sessions()
if err != nil {
return err
}
for _, s := range sessions {
for si := iter.Next(); si != nil; si = iter.Next() {
sink.Write([]byte{byte(structs.SessionRequestType)})
if err := encoder.Encode(s); err != nil {
if err := encoder.Encode(si.(*structs.Session)); err != nil {
return err
}
}
@ -461,14 +461,14 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
acls, err := s.state.ACLDump()
iter, err := s.state.ACLs()
if err != nil {
return err
}
for _, s := range acls {
for ai := iter.Next(); ai != nil; ai = iter.Next() {
sink.Write([]byte{byte(structs.ACLRequestType)})
if err := encoder.Encode(s); err != nil {
if err := encoder.Encode(ai.(*structs.ACL)); err != nil {
return err
}
}
@ -493,21 +493,22 @@ func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink,
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
stones, err := s.state.TombstoneDump()
iter, err := s.state.Tombstones()
if err != nil {
return err
}
for _, s := range stones {
for ti := iter.Next(); ti != nil; ti = iter.Next() {
sink.Write([]byte{byte(structs.TombstoneRequestType)})
// For historical reasons, these are serialized in the snapshots
// as KV entries. We want to keep the snapshot format compatible
// with pre-0.6 versions for now.
stone := ti.(*state.Tombstone)
fake := &structs.DirEntry{
Key: s.Key,
Key: stone.Key,
RaftIndex: structs.RaftIndex{
ModifyIndex: s.Index,
ModifyIndex: stone.Index,
},
}
if err := encoder.Encode(fake); err != nil {

View File

@ -5,6 +5,7 @@ import (
"os"
"testing"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
)
@ -474,12 +475,19 @@ func TestFSM_SnapshotRestore(t *testing.T) {
func() {
snap := fsm2.state.Snapshot()
defer snap.Close()
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dump) != 1 {
t.Fatalf("bad: %#v", dump)
stone := iter.Next().(*state.Tombstone)
if stone == nil {
t.Fatalf("missing tombstone")
}
if stone.Key != "/remove" || stone.Index != 12 {
t.Fatalf("bad: %v", stone)
}
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}()
}
@ -1015,12 +1023,12 @@ func TestFSM_TombstoneReap(t *testing.T) {
// Verify the tombstones are gone
snap := fsm.state.Snapshot()
defer snap.Close()
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dump) != 0 {
t.Fatalf("bad: %#v", dump)
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}

View File

@ -590,12 +590,15 @@ func TestLeader_ReapTombstones(t *testing.T) {
func() {
snap := state.Snapshot()
defer snap.Close()
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dump) != 1 {
t.Fatalf("bad: %#v", dump)
if iter.Next() == nil {
t.Fatalf("missing tombstones")
}
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}()
@ -604,11 +607,11 @@ func TestLeader_ReapTombstones(t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
snap := state.Snapshot()
defer snap.Close()
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
return false, err
}
return len(dump) == 0, nil
return iter.Next() == nil, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

View File

@ -62,17 +62,13 @@ func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, prefix string) (uint64, error)
}
// DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *memdb.Txn) ([]*Tombstone, error) {
stones, err := tx.Get("tombstones", "id")
func (g *Graveyard) DumpTxn(tx *memdb.Txn) (memdb.ResultIterator, error) {
iter, err := tx.Get("tombstones", "id")
if err != nil {
return nil, fmt.Errorf("failed querying tombstones: %s", err)
return nil, err
}
var dump []*Tombstone
for stone := stones.Next(); stone != nil; stone = stones.Next() {
dump = append(dump, stone.(*Tombstone))
}
return dump, nil
return iter, nil
}
// RestoreTxn is used when restoring from a snapshot. For general inserts, use

View File

@ -199,10 +199,14 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
tx := s.db.Txn(false)
defer tx.Abort()
dump, err := g.DumpTxn(tx)
iter, err := g.DumpTxn(tx)
if err != nil {
t.Fatalf("err: %s", err)
}
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
return dump
}()
@ -241,10 +245,14 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
tx := s.db.Txn(false)
defer tx.Abort()
dump, err := g.DumpTxn(tx)
iter, err := g.DumpTxn(tx)
if err != nil {
t.Fatalf("err: %s", err)
}
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
return dump
}()
if !reflect.DeepEqual(dump, expected) {

View File

@ -156,7 +156,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
return iter, nil
}
// KVSDump is used to pull the full list of KVS entries for use during snapshots.
// KVs is used to pull the full list of KVS entries for use during snapshots.
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("kvs", "id_prefix")
if err != nil {
@ -165,28 +165,27 @@ func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
return iter, nil
}
// TombstoneDump is used to pull all the tombstones from the graveyard.
func (s *StateSnapshot) TombstoneDump() ([]*Tombstone, error) {
// Tombstones is used to pull all the tombstones from the graveyard.
func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
return s.store.kvsGraveyard.DumpTxn(s.tx)
}
// SessionDump is used to pull the full list of sessions for use during snapshots.
func (s *StateSnapshot) SessionDump() (structs.Sessions, error) {
sessions, err := s.tx.Get("sessions", "id")
// Sessions is used to pull the full list of sessions for use during snapshots.
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("sessions", "id")
if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err)
return nil, err
}
var dump structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
dump = append(dump, session.(*structs.Session))
}
return dump, nil
return iter, nil
}
// ACLDump is used to pull all the ACLs from the snapshot.
func (s *StateSnapshot) ACLDump() (structs.ACLs, error) {
return aclListTxn(s.tx)
// ACLs is used to pull all the ACLs from the snapshot.
func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("acls", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// maxIndex is a helper used to retrieve the highest known index
@ -2094,7 +2093,7 @@ func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...)
// Return the ACLs.
acls, err := aclListTxn(tx)
acls, err := s.aclListTxn(tx)
if err != nil {
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
}
@ -2103,7 +2102,7 @@ func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
// aclListTxn is used to list out all of the ACLs in the state store. This is a
// function vs. a method so it can be called from the snapshotter.
func aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
func (s *StateStore) aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
// Query all of the ACLs in the state store
acls, err := tx.Get("acls", "id")
if err != nil {

View File

@ -286,12 +286,12 @@ func TestStateStore_ReapTombstones(t *testing.T) {
// Make sure the tombstones are actually gone.
snap := s.Snapshot()
defer snap.Close()
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dump) != 0 {
t.Fatalf("bad: %#v", dump)
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}
@ -3264,10 +3264,14 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
}
// Verify the snapshot.
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump []*Tombstone
for ti := iter.Next(); ti != nil; ti = iter.Next() {
dump = append(dump, ti.(*Tombstone))
}
if len(dump) != 1 {
t.Fatalf("bad %#v", dump)
}
@ -3312,12 +3316,12 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
// But make sure the tombstone is actually gone.
snap := s.Snapshot()
defer snap.Close()
dump, err := snap.TombstoneDump()
iter, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(dump) != 0 {
t.Fatalf("bad %#v", dump)
if iter.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}()
}
@ -3656,10 +3660,14 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
if idx := snap.LastIndex(); idx != 7 {
t.Fatalf("bad index: %d", idx)
}
dump, err := snap.SessionDump()
iter, err := snap.Sessions()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.Sessions
for si := iter.Next(); si != nil; si = iter.Next() {
dump = append(dump, si.(*structs.Session))
}
if !reflect.DeepEqual(dump, sessions) {
t.Fatalf("bad: %#v", dump)
}
@ -4319,10 +4327,14 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
if idx := snap.LastIndex(); idx != 2 {
t.Fatalf("bad index: %d", idx)
}
dump, err := snap.ACLDump()
iter, err := snap.ACLs()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.ACLs
for ai := iter.Next(); ai != nil; ai = iter.Next() {
dump = append(dump, ai.(*structs.ACL))
}
if !reflect.DeepEqual(dump, acls) {
t.Fatalf("bad: %#v", dump)
}