Partition session checks store (#11638)

* state: port KV and Tombstone tables to new pattern

* go fmt'ed

* handle wildcards for tombstones

* Fix graveyard ent vs oss

* fix oss compilation error

* add partition to tombstones and kv state store indexes

* refactor to use `indexWithEnterpriseIndexable`

* Apply suggestions from code review

Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* add `singleValueID` implementation assertions

* partition `tableSessions` table

* fix sessions to use UUID and fix prefix index

* fix oss build

* clean up unused functions

* fix oss compilation

* add a partition indexer for sessions

* Fix oss to not have partition index

* fix oss tests

* remove unused operations_ent.go and operations_oss.go func

* remove unused const

* convert `IndexID` of `session_checks` table

* convert `indexSession` of `session_checks` table

* convert `indexNodeCheck` of `session_checks` table

* partition `indexID` and `indexSession` of `tableSessionChecks`

* fix oss linter

* fix review comments

* remove partition for Checks as it's always use the session partition

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
This commit is contained in:
Dhia Ayachi 2021-11-24 09:10:38 -05:00 committed by GitHub
parent 2350e7e56a
commit bb83624950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 208 additions and 63 deletions

View File

@ -2562,7 +2562,8 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
// checkSessionsTxn returns the IDs of all sessions associated with a health check // checkSessionsTxn returns the IDs of all sessions associated with a health check
func checkSessionsTxn(tx ReadTxn, hc *structs.HealthCheck) ([]*sessionCheck, error) { func checkSessionsTxn(tx ReadTxn, hc *structs.HealthCheck) ([]*sessionCheck, error) {
mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) mappings, err := tx.Get(tableSessionChecks, indexNodeCheck, MultiQuery{Value: []string{hc.Node, string(hc.CheckID)},
EnterpriseMeta: hc.EnterpriseMeta})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed session checks lookup: %s", err) return nil, fmt.Errorf("failed session checks lookup: %s", err)
} }

View File

@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -145,11 +146,50 @@ type singleValueID interface {
NamespaceOrDefault() string NamespaceOrDefault() string
} }
type multiValueID interface {
IDValue() []string
PartitionOrDefault() string
NamespaceOrDefault() string
}
var _ singleValueID = (*structs.DirEntry)(nil) var _ singleValueID = (*structs.DirEntry)(nil)
var _ singleValueID = (*Tombstone)(nil) var _ singleValueID = (*Tombstone)(nil)
var _ singleValueID = (*Query)(nil) var _ singleValueID = (*Query)(nil)
var _ singleValueID = (*structs.Session)(nil) var _ singleValueID = (*structs.Session)(nil)
// indexFromIDValue creates an index key from any struct that implements singleValueID
func indexFromIDValueLowerCase(raw interface{}) ([]byte, error) {
e, ok := raw.(singleValueID)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw)
}
v := strings.ToLower(e.IDValue())
if v == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(v)
return b.Bytes(), nil
}
// indexFromIDValue creates an index key from any struct that implements singleValueID
func indexFromMultiValueID(raw interface{}) ([]byte, error) {
e, ok := raw.(multiValueID)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement multiValueID", raw)
}
var b indexBuilder
for _, v := range e.IDValue() {
if v == "" {
return nil, errMissingValueForIndex
}
b.String(strings.ToLower(v))
}
return b.Bytes(), nil
}
func (b *indexBuilder) Bool(v bool) { func (b *indexBuilder) Bool(v bool) {
b.Raw([]byte{intFromBool(v)}) b.Raw([]byte{intFromBool(v)})
} }

View File

@ -39,23 +39,6 @@ func prefixIndexForIDValue(arg interface{}) ([]byte, error) {
return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg) return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg)
} }
func prefixIndexForKVEntry(arg interface{}) ([]byte, error) {
var b indexBuilder
switch v := arg.(type) {
// DeletePrefix always uses a string, pass it along unmodified
case string:
return []byte(v), nil
case structs.EnterpriseMeta:
return nil, nil
case singleValueID:
// Omit null terminator, because we want to prefix match keys
(*bytes.Buffer)(&b).WriteString(v.IDValue())
return b.Bytes(), nil
}
return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg)
}
func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) error { func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) error {
if err := tx.Insert(tableKVs, entry); err != nil { if err := tx.Insert(tableKVs, entry); err != nil {
return err return err

View File

@ -31,6 +31,27 @@ func (q Query) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault() return q.EnterpriseMeta.PartitionOrDefault()
} }
type MultiQuery struct {
Value []string
structs.EnterpriseMeta
}
func (q MultiQuery) IDValue() []string {
return q.Value
}
// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q MultiQuery) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault()
}
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q MultiQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
// indexFromQuery builds an index key where Query.Value is lowercase, and is // indexFromQuery builds an index key where Query.Value is lowercase, and is
// a required value. // a required value.
func indexFromQuery(arg interface{}) ([]byte, error) { func indexFromQuery(arg interface{}) ([]byte, error) {

View File

@ -12,13 +12,16 @@ import (
) )
const ( const (
tableSessions = "sessions" tableSessions = "sessions"
tableSessionChecks = "session_checks"
indexNodeCheck = "node_check"
) )
func indexFromSession(raw interface{}) ([]byte, error) { func indexFromSession(raw interface{}) ([]byte, error) {
e, ok := raw.(*structs.Session) e, ok := raw.(*structs.Session)
if !ok { if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw) return nil, fmt.Errorf("unexpected type %T, does not implement *structs.Session", raw)
} }
v := strings.ToLower(e.ID) v := strings.ToLower(e.ID)
@ -57,43 +60,93 @@ func sessionsTableSchema() *memdb.TableSchema {
// checks. // checks.
func sessionChecksTableSchema() *memdb.TableSchema { func sessionChecksTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{ return &memdb.TableSchema{
Name: "session_checks", Name: tableSessionChecks,
Indexes: map[string]*memdb.IndexSchema{ Indexes: map[string]*memdb.IndexSchema{
"id": { indexID: {
Name: "id", Name: indexID,
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: true,
Indexer: &memdb.CompoundIndex{ Indexer: idCheckIndexer(),
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&CheckIDIndex{},
&memdb.UUIDFieldIndex{
Field: "Session",
},
},
},
}, },
"node_check": { indexNodeCheck: {
Name: "node_check", Name: indexNodeCheck,
AllowMissing: false, AllowMissing: false,
Unique: false, Unique: false,
Indexer: nodeChecksIndexer(), Indexer: nodeChecksIndexer(),
}, },
"session": { indexSession: {
Name: "session", Name: indexSession,
AllowMissing: false, AllowMissing: false,
Unique: false, Unique: false,
Indexer: &memdb.UUIDFieldIndex{ Indexer: sessionCheckIndexer(),
Field: "Session",
},
}, },
}, },
} }
} }
// indexNodeFromSession creates an index key from *structs.Session
func indexNodeFromSession(raw interface{}) ([]byte, error) {
e, ok := raw.(*structs.Session)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement *structs.Session", raw)
}
v := strings.ToLower(e.Node)
if v == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(v)
return b.Bytes(), nil
}
// indexFromNodeCheckIDSession creates an index key from sessionCheck
func indexFromNodeCheckIDSession(raw interface{}) ([]byte, error) {
e, ok := raw.(*sessionCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement sessionCheck", raw)
}
var b indexBuilder
v := strings.ToLower(e.Node)
if v == "" {
return nil, errMissingValueForIndex
}
b.String(v)
v = strings.ToLower(string(e.CheckID.ID))
if v == "" {
return nil, errMissingValueForIndex
}
b.String(v)
v = strings.ToLower(e.Session)
if v == "" {
return nil, errMissingValueForIndex
}
b.String(v)
return b.Bytes(), nil
}
// indexSessionCheckFromSession creates an index key from sessionCheck
func indexSessionCheckFromSession(raw interface{}) ([]byte, error) {
e, ok := raw.(*sessionCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement *sessionCheck", raw)
}
var b indexBuilder
v := strings.ToLower(e.Session)
if v == "" {
return nil, errMissingValueForIndex
}
b.String(v)
return b.Bytes(), nil
}
type CheckIDIndex struct { type CheckIDIndex struct {
} }
@ -371,8 +424,11 @@ func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entM
return fmt.Errorf("unknown session behavior %#v", session.Behavior) return fmt.Errorf("unknown session behavior %#v", session.Behavior)
} }
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
// Delete any check mappings. // Delete any check mappings.
mappings, err := tx.Get("session_checks", "session", sessionID) mappings, err := tx.Get(tableSessionChecks, indexSession, Query{Value: sessionID, EnterpriseMeta: *entMeta})
if err != nil { if err != nil {
return fmt.Errorf("failed session checks lookup: %s", err) return fmt.Errorf("failed session checks lookup: %s", err)
} }
@ -384,7 +440,7 @@ func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entM
// Do the delete in a separate loop so we don't trash the iterator. // Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs { for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil { if err := tx.Delete(tableSessionChecks, obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err) return fmt.Errorf("failed deleting session check: %s", err)
} }
} }

View File

@ -5,6 +5,7 @@ package state
import ( import (
"fmt" "fmt"
"strings"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
@ -20,25 +21,56 @@ func sessionIndexer() indexerSingleWithPrefix {
} }
} }
func nodeSessionsIndexer() *memdb.StringFieldIndex { func nodeSessionsIndexer() indexerSingle {
return &memdb.StringFieldIndex{ return indexerSingle{
Field: "Node", readIndex: readIndex(indexFromIDValueLowerCase),
Lowercase: true, writeIndex: writeIndex(indexNodeFromSession),
} }
} }
func nodeChecksIndexer() *memdb.CompoundIndex { func idCheckIndexer() indexerSingle {
return &memdb.CompoundIndex{ return indexerSingle{
Indexes: []memdb.Indexer{ readIndex: indexFromNodeCheckIDSession,
&memdb.StringFieldIndex{ writeIndex: indexFromNodeCheckIDSession,
Field: "Node",
Lowercase: true,
},
&CheckIDIndex{},
},
} }
} }
func sessionCheckIndexer() indexerSingle {
return indexerSingle{
readIndex: indexFromQuery,
writeIndex: indexSessionCheckFromSession,
}
}
func nodeChecksIndexer() indexerSingle {
return indexerSingle{
readIndex: indexFromMultiValueID,
writeIndex: indexFromNodeCheckID,
}
}
// indexFromNodeCheckID creates an index key from a sessionCheck structure
func indexFromNodeCheckID(raw interface{}) ([]byte, error) {
e, ok := raw.(*sessionCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T, does not implement *structs.Session", raw)
}
var b indexBuilder
v := strings.ToLower(e.Node)
if v == "" {
return nil, errMissingValueForIndex
}
b.String(v)
v = strings.ToLower(string(e.CheckID.ID))
if v == "" {
return nil, errMissingValueForIndex
}
b.String(v)
return b.Bytes(), nil
}
func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error { func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error {
if err := tx.Delete(tableSessions, session); err != nil { if err := tx.Delete(tableSessions, session); err != nil {
return fmt.Errorf("failed deleting session: %s", err) return fmt.Errorf("failed deleting session: %s", err)
@ -64,7 +96,7 @@ func insertSessionTxn(tx WriteTxn, session *structs.Session, idx uint64, updateM
CheckID: structs.CheckID{ID: checkID}, CheckID: structs.CheckID{ID: checkID},
Session: session.ID, Session: session.ID,
} }
if err := tx.Insert("session_checks", mapping); err != nil { if err := tx.Insert(tableSessionChecks, mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err) return fmt.Errorf("failed inserting session check mapping: %s", err)
} }
} }
@ -91,7 +123,7 @@ func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) {
func nodeSessionsTxn(tx ReadTxn, func nodeSessionsTxn(tx ReadTxn,
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) { ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
sessions, err := tx.Get(tableSessions, indexNode, node) sessions, err := tx.Get(tableSessions, indexNode, Query{Value: node})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err) return nil, fmt.Errorf("failed session lookup: %s", err)
} }

View File

@ -143,7 +143,7 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
// Check mappings were inserted // Check mappings were inserted
{ {
check, err := tx.First("session_checks", "session", sess.ID) check, err := tx.First(tableSessionChecks, indexSession, Query{Value: sess.ID})
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -174,7 +174,7 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
checks, err := tx.Get("session_checks", "session", sess2.ID) checks, err := tx.Get(tableSessionChecks, indexSession, Query{Value: sess2.ID})
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -509,7 +509,7 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
check, err := tx.First("session_checks", "session", session1) check, err := tx.First(tableSessionChecks, indexSession, Query{Value: session1})
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -730,7 +730,7 @@ func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) {
// Manually make sure the session checks mapping is clear. // Manually make sure the session checks mapping is clear.
tx := s.db.Txn(false) tx := s.db.Txn(false)
mapping, err := tx.First("session_checks", "session", session.ID) mapping, err := tx.First(tableSessionChecks, indexSession, Query{Value: session.ID})
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }

View File

@ -1865,6 +1865,18 @@ type CheckID struct {
EnterpriseMeta EnterpriseMeta
} }
// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (c CheckID) NamespaceOrDefault() string {
return c.EnterpriseMeta.NamespaceOrDefault()
}
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (c CheckID) PartitionOrDefault() string {
return c.EnterpriseMeta.PartitionOrDefault()
}
func NewCheckID(id types.CheckID, entMeta *EnterpriseMeta) CheckID { func NewCheckID(id types.CheckID, entMeta *EnterpriseMeta) CheckID {
var cid CheckID var cid CheckID
cid.ID = id cid.ID = id