diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 5e6b1ad370..db01580e42 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -414,23 +414,38 @@ func TestFSM_SnapshotRestore(t *testing.T) { t.Fatalf("bad: %v", d) } + // Verify the index is restored + idx, _, err := fsm.state.KVSListKeys("/blah", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx <= 1 { + t.Fatalf("bad index: %d", idx) + } + // Verify session is restored - _, s, err := fsm.state.SessionGet(session.ID) + idx, s, err := fsm.state.SessionGet(session.ID) if err != nil { t.Fatalf("err: %v", err) } if s.Node != "foo" { t.Fatalf("bad: %v", s) } + if idx <= 1 { + t.Fatalf("bad index: %d", idx) + } // Verify ACL is restored - _, a, err := fsm.state.ACLGet(acl.ID) + idx, a, err := fsm.state.ACLGet(acl.ID) if err != nil { t.Fatalf("err: %v", err) } if a.Name != "User Token" { t.Fatalf("bad: %v", a) } + if idx <= 1 { + t.Fatalf("bad index: %d", idx) + } } func TestFSM_KVSSet(t *testing.T) { diff --git a/consul/state_store.go b/consul/state_store.go index 030f1d55c9..7ae9e4735a 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1060,6 +1060,9 @@ func (s *StateStore) KVSRestore(d *structs.DirEntry) error { if err := s.kvsTable.InsertTxn(tx, d); err != nil { return err } + if err := s.kvsTable.SetMaxLastIndexTxn(tx, d.ModifyIndex); err != nil { + return err + } return tx.Commit() } @@ -1096,10 +1099,18 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er return 0, nil, err } + // Ensure a non-zero index + if idx == 0 { + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + idx = 1 + } + // Aggregate the stream stream := make(chan interface{}, 128) done := make(chan struct{}) var keys []string + var maxIndex uint64 go func() { prefixLen := len(prefix) sepLen := len(seperator) @@ -1108,6 +1119,11 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er ent := raw.(*structs.DirEntry) after := ent.Key[prefixLen:] + // Update the hightest index we've seen + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + // If there is no seperator, always accumulate if sepLen == 0 { keys = append(keys, ent.Key) @@ -1131,6 +1147,11 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er // Start the stream, and wait for completion err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) <-done + + // Use the maxIndex if we have any keys + if maxIndex != 0 { + idx = maxIndex + } return idx, keys, err } @@ -1618,6 +1639,9 @@ func (s *StateStore) ACLRestore(acl *structs.ACL) error { if err := s.aclTable.InsertTxn(tx, acl); err != nil { return err } + if err := s.aclTable.SetMaxLastIndexTxn(tx, acl.ModifyIndex); err != nil { + return err + } return tx.Commit() } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 323fe1b6ec..9480cf1b13 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1554,7 +1554,7 @@ func TestKVS_ListKeys(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - if idx != 0 { + if idx != 1 { t.Fatalf("bad: %v", idx) } if len(keys) != 0 { @@ -1657,6 +1657,65 @@ func TestKVS_ListKeys(t *testing.T) { } } +func TestKVS_ListKeys_Index(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1003, d); err != nil { + t.Fatalf("err: %v", err) + } + + idx, keys, err := store.KVSListKeys("/foo", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/ba", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1002 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 2 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/nope", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1003 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 0 { + t.Fatalf("bad: %v", keys) + } +} + func TestKVSDeleteTree(t *testing.T) { store, err := testStateStore() if err != nil {