Merge pull request #9728 from hashicorp/dnephin/state-index-table

state: document how index table is used
This commit is contained in:
Daniel Nephin 2021-02-16 15:27:27 -05:00 committed by GitHub
commit 89383e2d98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 38 deletions

View File

@ -285,7 +285,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
defer tx.Abort()
// We must have initialized before this will ever be possible.
existing, err := tx.First("index", "id", "acl-token-bootstrap")
existing, err := tx.First(tableIndex, indexID, "acl-token-bootstrap")
if err != nil {
return fmt.Errorf("bootstrap check failed: %v", err)
}
@ -300,7 +300,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
if err := aclTokenSetTxn(tx, idx, token, ACLTokenSetOptions{Legacy: legacy}); err != nil {
return fmt.Errorf("failed inserting bootstrap token: %v", err)
}
if err := tx.Insert("index", &IndexEntry{"acl-token-bootstrap", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"acl-token-bootstrap", idx}); err != nil {
return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err)
}
return tx.Commit()
@ -311,7 +311,7 @@ func (s *Store) CanBootstrapACLToken() (bool, uint64, error) {
tx := s.db.Txn(false)
// Lookup the bootstrap sentinel
out, err := tx.First("index", "id", "acl-token-bootstrap")
out, err := tx.First(tableIndex, indexID, "acl-token-bootstrap")
if err != nil {
return false, 0, err
}

View File

@ -324,7 +324,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
if err := tx.Insert("nodes", node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// Update the node's service indexes as the node information is included
@ -557,7 +557,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
if err := tx.Delete("coordinates", coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
@ -566,7 +566,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
if err := tx.Delete("nodes", node); err != nil {
return fmt.Errorf("failed deleting node: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
@ -1367,7 +1367,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta)
if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it
if errW := tx.Delete("index", serviceIndex); errW != nil {
if errW := tx.Delete(tableIndex, serviceIndex); errW != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
}
}

View File

@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _
}
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{indexServiceExtinction, idx}); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %s", err)
}
return nil
@ -86,7 +86,7 @@ func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
}
func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil))
return tx.FirstWatch(tableIndex, "id", serviceIndexName(serviceName, nil))
}
func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En
}
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
return tx.First("index", "id", indexServiceExtinction)
return tx.First(tableIndex, "id", indexServiceExtinction)
}
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
@ -129,7 +129,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// update the universal index entry
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"checks", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil

View File

@ -5,8 +5,9 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
)
func kvsIndexer() *memdb.StringFieldIndex {
@ -26,7 +27,7 @@ func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) e
return fmt.Errorf("failed updating kvs index: %v", err)
}
} else {
if err := tx.Insert("index", &IndexEntry{"kvs", entry.ModifyIndex}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"kvs", entry.ModifyIndex}); err != nil {
return fmt.Errorf("failed updating kvs index: %s", err)
}
}
@ -70,7 +71,7 @@ func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta
}
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
@ -87,7 +88,7 @@ func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating kvs index: %s", err)
}

View File

@ -52,14 +52,27 @@ func addTableSchemas(db *memdb.DBSchema, schemas ...func() *memdb.TableSchema) {
}
}
// indexTableSchema returns a new table schema used for tracking various indexes
// for the Raft log.
// IndexEntry keeps a record of the last index of a table or entity within a table.
type IndexEntry struct {
Key string
Value uint64
}
const tableIndex = "index"
// indexTableSchema returns a new table schema used for tracking various the
// latest raft index for a table or entities within a table.
//
// The index table is necessary for tables that do not use tombstones. If the latest
// items in the table are deleted, the max index of a table would appear to go
// backwards. With the index table we can keep track of the latest update to a
// table, even when that update is a delete of the most recent item.
func indexTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "index",
Name: tableIndex,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{

View File

@ -134,12 +134,6 @@ type Restore struct {
tx *txn
}
// IndexEntry keeps a record of the last index per-table.
type IndexEntry struct {
Key string
Value uint64
}
// sessionCheck is used to create a many-to-one table such that
// each check registered by a session can be mapped back to the
// session table. This is only used internally in the state
@ -215,16 +209,12 @@ func (s *Snapshot) LastIndex() uint64 {
}
func (s *Snapshot) Indexes() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("index", "id")
if err != nil {
return nil, err
}
return iter, nil
return s.tx.Get(tableIndex, indexID)
}
// IndexRestore is used to restore an index
func (s *Restore) IndexRestore(idx *IndexEntry) error {
if err := s.tx.Insert("index", idx); err != nil {
if err := s.tx.Insert(tableIndex, idx); err != nil {
return fmt.Errorf("index insert failed: %v", err)
}
return nil
@ -285,7 +275,7 @@ func maxIndexTxn(tx ReadTxn, tables ...string) uint64 {
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
var lindex uint64
for _, table := range tables {
ch, ti, err := tx.FirstWatch("index", "id", table)
ch, ti, err := tx.FirstWatch(tableIndex, "id", table)
if err != nil {
panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
}
@ -300,21 +290,22 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to
// the given idx only if it's greater than the current index.
func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error {
ti, err := tx.First("index", "id", table)
ti, err := tx.First(tableIndex, indexID, table)
if err != nil {
return fmt.Errorf("failed to retrieve existing index: %s", err)
}
// Always take the first update, otherwise do the > check.
if ti == nil {
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil {
if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil {
return fmt.Errorf("failed updating index %s", err)
}
} else if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value {
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil {
return nil
}
if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value {
if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil {
return fmt.Errorf("failed updating index %s", err)
}
}
return nil
}