mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
refactor session state store tables to use the new index pattern (#11525)
* state: port KV and Tombstone tables to new pattern * go fmt'ed * handle wildcards for tombstones * Fix graveyard ent vs oss * fix oss compilation error * add partition to tombstones and kv state store indexes * refactor to use `indexWithEnterpriseIndexable` * Apply suggestions from code review Co-authored-by: Chris S. Kim <ckim@hashicorp.com> Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com> * add `singleValueID` implementation assertions * partition `tableSessions` table * fix sessions to use UUID and fix prefix index * fix oss build * clean up unused functions * fix oss compilation * add a partition indexer for sessions * Fix oss to not have partition index * fix oss tests * remove unused func `prefixIndexFromServiceNameAsString` * fix test error check * remove unused operations_ent.go and operations_oss.go func * remove unused const Co-authored-by: Daniel Nephin <dnephin@hashicorp.com> Co-authored-by: Chris S. Kim <ckim@hashicorp.com> Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
This commit is contained in:
parent
98735a6d12
commit
7916268c40
@ -30,7 +30,7 @@ func (g *Graveyard) insertTombstoneWithTxn(tx WriteTxn, _ string, stone *Tombsto
|
||||
// given context, using a prefix match.
|
||||
func (g *Graveyard) GetMaxIndexTxn(tx ReadTxn, prefix string, _ *structs.EnterpriseMeta) (uint64, error) {
|
||||
var lindex uint64
|
||||
q := Query{Value: prefix}
|
||||
q := Query{Value: prefix, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()}
|
||||
stones, err := tx.Get(tableTombstones, indexID+"_prefix", q)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed querying tombstones: %s", err)
|
||||
|
@ -148,6 +148,7 @@ type singleValueID interface {
|
||||
var _ singleValueID = (*structs.DirEntry)(nil)
|
||||
var _ singleValueID = (*Tombstone)(nil)
|
||||
var _ singleValueID = (*Query)(nil)
|
||||
var _ singleValueID = (*structs.Session)(nil)
|
||||
|
||||
func (b *indexBuilder) Bool(v bool) {
|
||||
b.Raw([]byte{intFromBool(v)})
|
||||
|
@ -39,8 +39,8 @@ func kvsTableSchema() *memdb.TableSchema {
|
||||
}
|
||||
}
|
||||
|
||||
// indexFromKVEntry creates an index key from any struct that implements singleValueID
|
||||
func indexFromKVEntry(raw interface{}) ([]byte, error) {
|
||||
// indexFromIDValue creates an index key from any struct that implements singleValueID
|
||||
func indexFromIDValue(raw interface{}) ([]byte, error) {
|
||||
e, ok := raw.(singleValueID)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw)
|
||||
@ -431,7 +431,7 @@ func kvsLockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error)
|
||||
}
|
||||
|
||||
// Verify that the session exists.
|
||||
sess, err := firstWithTxn(tx, "sessions", "id", entry.Session, &entry.EnterpriseMeta)
|
||||
sess, err := tx.First(tableSessions, indexID, Query{Value: entry.Session, EnterpriseMeta: entry.EnterpriseMeta})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
|
@ -14,12 +14,31 @@ import (
|
||||
|
||||
func kvsIndexer() indexerSingleWithPrefix {
|
||||
return indexerSingleWithPrefix{
|
||||
readIndex: readIndex(indexFromKVEntry),
|
||||
writeIndex: writeIndex(indexFromKVEntry),
|
||||
prefixIndex: prefixIndex(prefixIndexForKVEntry),
|
||||
readIndex: readIndex(indexFromIDValue),
|
||||
writeIndex: writeIndex(indexFromIDValue),
|
||||
prefixIndex: prefixIndex(prefixIndexForIDValue),
|
||||
}
|
||||
}
|
||||
|
||||
func prefixIndexForIDValue(arg interface{}) ([]byte, error) {
|
||||
switch v := arg.(type) {
|
||||
// DeletePrefix always uses a string, pass it along unmodified
|
||||
case string:
|
||||
return []byte(v), nil
|
||||
case structs.EnterpriseMeta:
|
||||
return nil, nil
|
||||
case singleValueID:
|
||||
var b indexBuilder
|
||||
if v.IDValue() != "" {
|
||||
// Omit null terminator, because we want to prefix match keys
|
||||
b.String(v.IDValue())
|
||||
}
|
||||
prefix := bytes.Trim(b.Bytes(), "\x00")
|
||||
return prefix, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg)
|
||||
}
|
||||
|
||||
func prefixIndexForKVEntry(arg interface{}) ([]byte, error) {
|
||||
var b indexBuilder
|
||||
switch v := arg.(type) {
|
||||
|
@ -1,3 +1,4 @@
|
||||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package state
|
||||
@ -8,24 +9,6 @@ import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func firstWithTxn(tx ReadTxn,
|
||||
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) {
|
||||
|
||||
return tx.First(table, index, idxVal)
|
||||
}
|
||||
|
||||
func firstWatchWithTxn(tx ReadTxn,
|
||||
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||
|
||||
return tx.FirstWatch(table, index, idxVal)
|
||||
}
|
||||
|
||||
func getWithTxn(tx ReadTxn,
|
||||
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
|
||||
return tx.Get(table, index, idxVal)
|
||||
}
|
||||
|
||||
func getCompoundWithTxn(tx ReadTxn, table, index string,
|
||||
_ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) {
|
||||
|
||||
|
@ -206,7 +206,7 @@ func preparedQuerySetTxn(tx WriteTxn, idx uint64, query *structs.PreparedQuery)
|
||||
|
||||
// Verify that the session exists.
|
||||
if query.Session != "" {
|
||||
sess, err := firstWithTxn(tx, "sessions", "id", query.Session, nil)
|
||||
sess, err := tx.First(tableSessions, indexID, Query{Value: query.Session, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()})
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid session: %v", err)
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package state
|
||||
@ -17,6 +18,9 @@ func prefixIndexFromQuery(arg interface{}) ([]byte, error) {
|
||||
case structs.EnterpriseMeta:
|
||||
return nil, nil
|
||||
case Query:
|
||||
if v.Value == "" {
|
||||
return nil, nil
|
||||
}
|
||||
b.String(strings.ToLower(v.Value))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
@ -28,21 +32,6 @@ func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) {
|
||||
return prefixIndexFromQuery(arg)
|
||||
}
|
||||
|
||||
func prefixIndexFromServiceNameAsString(arg interface{}) ([]byte, error) {
|
||||
var b indexBuilder
|
||||
switch v := arg.(type) {
|
||||
case *structs.EnterpriseMeta:
|
||||
return nil, nil
|
||||
case structs.EnterpriseMeta:
|
||||
return nil, nil
|
||||
case structs.ServiceName:
|
||||
b.String(strings.ToLower(v.String()))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
|
||||
}
|
||||
|
||||
// indexFromAuthMethodQuery builds an index key where Query.Value is lowercase, and is
|
||||
// a required value.
|
||||
func indexFromAuthMethodQuery(arg interface{}) ([]byte, error) {
|
||||
|
@ -11,20 +11,40 @@ import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
tableSessions = "sessions"
|
||||
)
|
||||
|
||||
func indexFromSession(raw interface{}) ([]byte, error) {
|
||||
e, ok := raw.(*structs.Session)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw)
|
||||
}
|
||||
|
||||
v := strings.ToLower(e.ID)
|
||||
if v == "" {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(v)
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
// sessionsTableSchema returns a new table schema used for storing session
|
||||
// information.
|
||||
func sessionsTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "sessions",
|
||||
Name: tableSessions,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": {
|
||||
Name: "id",
|
||||
indexID: {
|
||||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: sessionIndexer(),
|
||||
},
|
||||
"node": {
|
||||
Name: "node",
|
||||
indexNode: {
|
||||
Name: indexNode,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: nodeSessionsIndexer(),
|
||||
@ -132,7 +152,7 @@ func (index *CheckIDIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
|
||||
|
||||
// Sessions is used to pull the full list of sessions for use during snapshots.
|
||||
func (s *Snapshot) Sessions() (memdb.ResultIterator, error) {
|
||||
iter, err := s.tx.Get("sessions", "id")
|
||||
iter, err := s.tx.Get(tableSessions, indexID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -227,7 +247,11 @@ func (s *Store) SessionGet(ws memdb.WatchSet,
|
||||
idx := sessionMaxIndex(tx, entMeta)
|
||||
|
||||
// Look up the session by its ID
|
||||
watchCh, session, err := firstWatchWithTxn(tx, "sessions", "id", sessionID, entMeta)
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
}
|
||||
watchCh, session, err := tx.FirstWatch(tableSessions, indexID, Query{Value: sessionID, EnterpriseMeta: *entMeta})
|
||||
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
@ -239,29 +263,6 @@ func (s *Store) SessionGet(ws memdb.WatchSet,
|
||||
return idx, nil, nil
|
||||
}
|
||||
|
||||
// SessionList returns a slice containing all of the active sessions.
|
||||
func (s *Store) SessionList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Sessions, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := sessionMaxIndex(tx, entMeta)
|
||||
|
||||
// Query all of the active sessions.
|
||||
sessions, err := getWithTxn(tx, "sessions", "id_prefix", "", entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
ws.Add(sessions.WatchCh())
|
||||
|
||||
// Go over the sessions and create a slice of them.
|
||||
var result structs.Sessions
|
||||
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||
result = append(result, session.(*structs.Session))
|
||||
}
|
||||
return idx, result, nil
|
||||
}
|
||||
|
||||
// NodeSessions returns a set of active sessions associated
|
||||
// with the given node ID. The returned index is the highest
|
||||
// index seen from the result set.
|
||||
@ -299,7 +300,10 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En
|
||||
// session deletion and handle session invalidation, etc.
|
||||
func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error {
|
||||
// Look up the session.
|
||||
sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta)
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
}
|
||||
sess, err := tx.First(tableSessions, indexID, Query{Value: sessionID, EnterpriseMeta: *entMeta})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package state
|
||||
@ -11,9 +12,11 @@ import (
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
func sessionIndexer() *memdb.UUIDFieldIndex {
|
||||
return &memdb.UUIDFieldIndex{
|
||||
Field: "ID",
|
||||
func sessionIndexer() indexerSingleWithPrefix {
|
||||
return indexerSingleWithPrefix{
|
||||
readIndex: readIndex(indexFromQuery),
|
||||
writeIndex: writeIndex(indexFromSession),
|
||||
prefixIndex: prefixIndex(prefixIndexFromQuery),
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,7 +40,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex {
|
||||
}
|
||||
|
||||
func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error {
|
||||
if err := tx.Delete("sessions", session); err != nil {
|
||||
if err := tx.Delete(tableSessions, session); err != nil {
|
||||
return fmt.Errorf("failed deleting session: %s", err)
|
||||
}
|
||||
|
||||
@ -50,7 +53,7 @@ func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64)
|
||||
}
|
||||
|
||||
func insertSessionTxn(tx WriteTxn, session *structs.Session, idx uint64, updateMax bool, _ bool) error {
|
||||
if err := tx.Insert("sessions", session); err != nil {
|
||||
if err := tx.Insert(tableSessions, session); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -88,7 +91,7 @@ func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) {
|
||||
func nodeSessionsTxn(tx ReadTxn,
|
||||
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
|
||||
|
||||
sessions, err := tx.Get("sessions", "node", node)
|
||||
sessions, err := tx.Get(tableSessions, indexNode, node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
@ -124,3 +127,27 @@ func validateSessionChecksTxn(tx ReadTxn, session *structs.Session) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SessionList returns a slice containing all of the active sessions.
|
||||
func (s *Store) SessionList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Sessions, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := sessionMaxIndex(tx, entMeta)
|
||||
|
||||
var result structs.Sessions
|
||||
|
||||
// Query all of the active sessions.
|
||||
sessions, err := tx.Get(tableSessions, indexID+"_prefix", Query{})
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
||||
}
|
||||
ws.Add(sessions.WatchCh())
|
||||
// Go over the sessions and create a slice of them.
|
||||
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||
result = append(result, session.(*structs.Session))
|
||||
}
|
||||
|
||||
return idx, result, nil
|
||||
}
|
||||
|
@ -385,7 +385,7 @@ func TestStateStore_SessionDestroy(t *testing.T) {
|
||||
|
||||
// Make sure the session is really gone.
|
||||
tx := s.db.Txn(false)
|
||||
sessions, err := tx.Get("sessions", "id")
|
||||
sessions, err := tx.Get(tableSessions, indexID)
|
||||
if err != nil || sessions.Next() != nil {
|
||||
t.Fatalf("session should not exist")
|
||||
}
|
||||
|
@ -2325,6 +2325,11 @@ type ServiceCheck struct {
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// IDValue implements the state.singleValueID interface for indexing.
|
||||
func (s *Session) IDValue() string {
|
||||
return s.ID
|
||||
}
|
||||
|
||||
func (s *Session) UnmarshalJSON(data []byte) (err error) {
|
||||
type Alias Session
|
||||
aux := &struct {
|
||||
|
@ -495,7 +495,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
|
||||
if resp.Code != 409 {
|
||||
t.Fatalf("expected 409, got %d", resp.Code)
|
||||
}
|
||||
if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) {
|
||||
if !bytes.Contains(resp.Body.Bytes(), []byte("invalid session")) {
|
||||
t.Fatalf("bad: %s", resp.Body.String())
|
||||
}
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user