consul: Fixing key list index calculation

This commit is contained in:
Armon Dadgar 2014-11-12 17:55:45 -08:00
parent bd612b7bc2
commit 8a1969cc8c
3 changed files with 101 additions and 3 deletions

View File

@ -414,23 +414,38 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("bad: %v", d) t.Fatalf("bad: %v", d)
} }
// Verify the index is restored
idx, _, err := fsm.state.KVSListKeys("/blah", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify session is restored // Verify session is restored
_, s, err := fsm.state.SessionGet(session.ID) idx, s, err := fsm.state.SessionGet(session.ID)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if s.Node != "foo" { if s.Node != "foo" {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if idx <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify ACL is restored // Verify ACL is restored
_, a, err := fsm.state.ACLGet(acl.ID) idx, a, err := fsm.state.ACLGet(acl.ID)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if a.Name != "User Token" { if a.Name != "User Token" {
t.Fatalf("bad: %v", a) t.Fatalf("bad: %v", a)
} }
if idx <= 1 {
t.Fatalf("bad index: %d", idx)
}
} }
func TestFSM_KVSSet(t *testing.T) { func TestFSM_KVSSet(t *testing.T) {

View File

@ -1060,6 +1060,9 @@ func (s *StateStore) KVSRestore(d *structs.DirEntry) error {
if err := s.kvsTable.InsertTxn(tx, d); err != nil { if err := s.kvsTable.InsertTxn(tx, d); err != nil {
return err return err
} }
if err := s.kvsTable.SetMaxLastIndexTxn(tx, d.ModifyIndex); err != nil {
return err
}
return tx.Commit() return tx.Commit()
} }
@ -1096,10 +1099,18 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
return 0, nil, err return 0, nil, err
} }
// Ensure a non-zero index
if idx == 0 {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
idx = 1
}
// Aggregate the stream // Aggregate the stream
stream := make(chan interface{}, 128) stream := make(chan interface{}, 128)
done := make(chan struct{}) done := make(chan struct{})
var keys []string var keys []string
var maxIndex uint64
go func() { go func() {
prefixLen := len(prefix) prefixLen := len(prefix)
sepLen := len(seperator) sepLen := len(seperator)
@ -1108,6 +1119,11 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
ent := raw.(*structs.DirEntry) ent := raw.(*structs.DirEntry)
after := ent.Key[prefixLen:] after := ent.Key[prefixLen:]
// Update the hightest index we've seen
if ent.ModifyIndex > maxIndex {
maxIndex = ent.ModifyIndex
}
// If there is no seperator, always accumulate // If there is no seperator, always accumulate
if sepLen == 0 { if sepLen == 0 {
keys = append(keys, ent.Key) keys = append(keys, ent.Key)
@ -1131,6 +1147,11 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
// Start the stream, and wait for completion // Start the stream, and wait for completion
err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix)
<-done <-done
// Use the maxIndex if we have any keys
if maxIndex != 0 {
idx = maxIndex
}
return idx, keys, err return idx, keys, err
} }
@ -1618,6 +1639,9 @@ func (s *StateStore) ACLRestore(acl *structs.ACL) error {
if err := s.aclTable.InsertTxn(tx, acl); err != nil { if err := s.aclTable.InsertTxn(tx, acl); err != nil {
return err return err
} }
if err := s.aclTable.SetMaxLastIndexTxn(tx, acl.ModifyIndex); err != nil {
return err
}
return tx.Commit() return tx.Commit()
} }

View File

@ -1554,7 +1554,7 @@ func TestKVS_ListKeys(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if idx != 0 { if idx != 1 {
t.Fatalf("bad: %v", idx) t.Fatalf("bad: %v", idx)
} }
if len(keys) != 0 { if len(keys) != 0 {
@ -1657,6 +1657,65 @@ func TestKVS_ListKeys(t *testing.T) {
} }
} }
func TestKVS_ListKeys_Index(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create the entries
d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1003, d); err != nil {
t.Fatalf("err: %v", err)
}
idx, keys, err := store.KVSListKeys("/foo", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1000 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 1 {
t.Fatalf("bad: %v", keys)
}
idx, keys, err = store.KVSListKeys("/ba", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1002 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 2 {
t.Fatalf("bad: %v", keys)
}
idx, keys, err = store.KVSListKeys("/nope", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1003 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 0 {
t.Fatalf("bad: %v", keys)
}
}
func TestKVSDeleteTree(t *testing.T) { func TestKVSDeleteTree(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {