diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index f60579a63d..f0bf123e39 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2562,7 +2562,8 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, // checkSessionsTxn returns the IDs of all sessions associated with a health check func checkSessionsTxn(tx ReadTxn, hc *structs.HealthCheck) ([]*sessionCheck, error) { - mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) + mappings, err := tx.Get(tableSessionChecks, indexNodeCheck, MultiQuery{Value: []string{hc.Node, string(hc.CheckID)}, + EnterpriseMeta: hc.EnterpriseMeta}) if err != nil { return nil, fmt.Errorf("failed session checks lookup: %s", err) } diff --git a/agent/consul/state/indexer.go b/agent/consul/state/indexer.go index 8f9c7bbe8c..7fa30a7d54 100644 --- a/agent/consul/state/indexer.go +++ b/agent/consul/state/indexer.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "strings" "time" "github.com/hashicorp/consul/agent/structs" @@ -145,11 +146,50 @@ type singleValueID interface { NamespaceOrDefault() string } +type multiValueID interface { + IDValue() []string + PartitionOrDefault() string + NamespaceOrDefault() string +} + var _ singleValueID = (*structs.DirEntry)(nil) var _ singleValueID = (*Tombstone)(nil) var _ singleValueID = (*Query)(nil) var _ singleValueID = (*structs.Session)(nil) +// indexFromIDValue creates an index key from any struct that implements singleValueID +func indexFromIDValueLowerCase(raw interface{}) ([]byte, error) { + e, ok := raw.(singleValueID) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw) + } + + v := strings.ToLower(e.IDValue()) + if v == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(v) + return b.Bytes(), nil +} + +// indexFromIDValue creates an index key from any struct that implements singleValueID +func indexFromMultiValueID(raw interface{}) ([]byte, error) { + e, ok := raw.(multiValueID) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement multiValueID", raw) + } + var b indexBuilder + for _, v := range e.IDValue() { + if v == "" { + return nil, errMissingValueForIndex + } + b.String(strings.ToLower(v)) + } + return b.Bytes(), nil +} + func (b *indexBuilder) Bool(v bool) { b.Raw([]byte{intFromBool(v)}) } diff --git a/agent/consul/state/kvs_oss.go b/agent/consul/state/kvs_oss.go index 75eb217df1..096d019f98 100644 --- a/agent/consul/state/kvs_oss.go +++ b/agent/consul/state/kvs_oss.go @@ -39,23 +39,6 @@ func prefixIndexForIDValue(arg interface{}) ([]byte, error) { return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg) } -func prefixIndexForKVEntry(arg interface{}) ([]byte, error) { - var b indexBuilder - switch v := arg.(type) { - // DeletePrefix always uses a string, pass it along unmodified - case string: - return []byte(v), nil - case structs.EnterpriseMeta: - return nil, nil - case singleValueID: - // Omit null terminator, because we want to prefix match keys - (*bytes.Buffer)(&b).WriteString(v.IDValue()) - return b.Bytes(), nil - } - - return nil, fmt.Errorf("unexpected type %T for singleValueID prefix index", arg) -} - func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) error { if err := tx.Insert(tableKVs, entry); err != nil { return err diff --git a/agent/consul/state/query.go b/agent/consul/state/query.go index 58d29988db..7e08384488 100644 --- a/agent/consul/state/query.go +++ b/agent/consul/state/query.go @@ -31,6 +31,27 @@ func (q Query) PartitionOrDefault() string { return q.EnterpriseMeta.PartitionOrDefault() } +type MultiQuery struct { + Value []string + structs.EnterpriseMeta +} + +func (q MultiQuery) IDValue() []string { + return q.Value +} + +// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q MultiQuery) NamespaceOrDefault() string { + return q.EnterpriseMeta.NamespaceOrDefault() +} + +// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q MultiQuery) PartitionOrDefault() string { + return q.EnterpriseMeta.PartitionOrDefault() +} + // indexFromQuery builds an index key where Query.Value is lowercase, and is // a required value. func indexFromQuery(arg interface{}) ([]byte, error) { diff --git a/agent/consul/state/session.go b/agent/consul/state/session.go index 58e39f9c6c..eb9af32f60 100644 --- a/agent/consul/state/session.go +++ b/agent/consul/state/session.go @@ -12,13 +12,16 @@ import ( ) const ( - tableSessions = "sessions" + tableSessions = "sessions" + tableSessionChecks = "session_checks" + + indexNodeCheck = "node_check" ) func indexFromSession(raw interface{}) ([]byte, error) { e, ok := raw.(*structs.Session) if !ok { - return nil, fmt.Errorf("unexpected type %T, does not implement singleValueID", raw) + return nil, fmt.Errorf("unexpected type %T, does not implement *structs.Session", raw) } v := strings.ToLower(e.ID) @@ -57,43 +60,93 @@ func sessionsTableSchema() *memdb.TableSchema { // checks. func sessionChecksTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "session_checks", + Name: tableSessionChecks, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &CheckIDIndex{}, - &memdb.UUIDFieldIndex{ - Field: "Session", - }, - }, - }, + Indexer: idCheckIndexer(), }, - "node_check": { - Name: "node_check", + indexNodeCheck: { + Name: indexNodeCheck, AllowMissing: false, Unique: false, Indexer: nodeChecksIndexer(), }, - "session": { - Name: "session", + indexSession: { + Name: indexSession, AllowMissing: false, Unique: false, - Indexer: &memdb.UUIDFieldIndex{ - Field: "Session", - }, + Indexer: sessionCheckIndexer(), }, }, } } +// indexNodeFromSession creates an index key from *structs.Session +func indexNodeFromSession(raw interface{}) ([]byte, error) { + e, ok := raw.(*structs.Session) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement *structs.Session", raw) + } + + v := strings.ToLower(e.Node) + if v == "" { + return nil, errMissingValueForIndex + } + var b indexBuilder + + b.String(v) + return b.Bytes(), nil +} + +// indexFromNodeCheckIDSession creates an index key from sessionCheck +func indexFromNodeCheckIDSession(raw interface{}) ([]byte, error) { + e, ok := raw.(*sessionCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement sessionCheck", raw) + } + + var b indexBuilder + v := strings.ToLower(e.Node) + if v == "" { + return nil, errMissingValueForIndex + } + b.String(v) + + v = strings.ToLower(string(e.CheckID.ID)) + if v == "" { + return nil, errMissingValueForIndex + } + b.String(v) + + v = strings.ToLower(e.Session) + if v == "" { + return nil, errMissingValueForIndex + } + b.String(v) + + return b.Bytes(), nil +} + +// indexSessionCheckFromSession creates an index key from sessionCheck +func indexSessionCheckFromSession(raw interface{}) ([]byte, error) { + e, ok := raw.(*sessionCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement *sessionCheck", raw) + } + + var b indexBuilder + v := strings.ToLower(e.Session) + if v == "" { + return nil, errMissingValueForIndex + } + b.String(v) + + return b.Bytes(), nil +} + type CheckIDIndex struct { } @@ -371,8 +424,11 @@ func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entM return fmt.Errorf("unknown session behavior %#v", session.Behavior) } + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() + } // Delete any check mappings. - mappings, err := tx.Get("session_checks", "session", sessionID) + mappings, err := tx.Get(tableSessionChecks, indexSession, Query{Value: sessionID, EnterpriseMeta: *entMeta}) if err != nil { return fmt.Errorf("failed session checks lookup: %s", err) } @@ -384,7 +440,7 @@ func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entM // Do the delete in a separate loop so we don't trash the iterator. for _, obj := range objs { - if err := tx.Delete("session_checks", obj); err != nil { + if err := tx.Delete(tableSessionChecks, obj); err != nil { return fmt.Errorf("failed deleting session check: %s", err) } } diff --git a/agent/consul/state/session_oss.go b/agent/consul/state/session_oss.go index 5866190c6e..478cd5b0fa 100644 --- a/agent/consul/state/session_oss.go +++ b/agent/consul/state/session_oss.go @@ -5,6 +5,7 @@ package state import ( "fmt" + "strings" "github.com/hashicorp/go-memdb" @@ -20,25 +21,56 @@ func sessionIndexer() indexerSingleWithPrefix { } } -func nodeSessionsIndexer() *memdb.StringFieldIndex { - return &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, +func nodeSessionsIndexer() indexerSingle { + return indexerSingle{ + readIndex: readIndex(indexFromIDValueLowerCase), + writeIndex: writeIndex(indexNodeFromSession), } } -func nodeChecksIndexer() *memdb.CompoundIndex { - return &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &CheckIDIndex{}, - }, +func idCheckIndexer() indexerSingle { + return indexerSingle{ + readIndex: indexFromNodeCheckIDSession, + writeIndex: indexFromNodeCheckIDSession, } } +func sessionCheckIndexer() indexerSingle { + return indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexSessionCheckFromSession, + } +} + +func nodeChecksIndexer() indexerSingle { + return indexerSingle{ + readIndex: indexFromMultiValueID, + writeIndex: indexFromNodeCheckID, + } +} + +// indexFromNodeCheckID creates an index key from a sessionCheck structure +func indexFromNodeCheckID(raw interface{}) ([]byte, error) { + e, ok := raw.(*sessionCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T, does not implement *structs.Session", raw) + } + var b indexBuilder + v := strings.ToLower(e.Node) + if v == "" { + return nil, errMissingValueForIndex + } + b.String(v) + + v = strings.ToLower(string(e.CheckID.ID)) + if v == "" { + return nil, errMissingValueForIndex + } + b.String(v) + + return b.Bytes(), nil +} + func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error { if err := tx.Delete(tableSessions, session); err != nil { return fmt.Errorf("failed deleting session: %s", err) @@ -64,7 +96,7 @@ func insertSessionTxn(tx WriteTxn, session *structs.Session, idx uint64, updateM CheckID: structs.CheckID{ID: checkID}, Session: session.ID, } - if err := tx.Insert("session_checks", mapping); err != nil { + if err := tx.Insert(tableSessionChecks, mapping); err != nil { return fmt.Errorf("failed inserting session check mapping: %s", err) } } @@ -91,7 +123,7 @@ func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) { func nodeSessionsTxn(tx ReadTxn, ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) { - sessions, err := tx.Get(tableSessions, indexNode, node) + sessions, err := tx.Get(tableSessions, indexNode, Query{Value: node}) if err != nil { return nil, fmt.Errorf("failed session lookup: %s", err) } diff --git a/agent/consul/state/session_test.go b/agent/consul/state/session_test.go index c81d7dc554..daeefc4912 100644 --- a/agent/consul/state/session_test.go +++ b/agent/consul/state/session_test.go @@ -143,7 +143,7 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) { // Check mappings were inserted { - check, err := tx.First("session_checks", "session", sess.ID) + check, err := tx.First(tableSessionChecks, indexSession, Query{Value: sess.ID}) if err != nil { t.Fatalf("err: %s", err) } @@ -174,7 +174,7 @@ func TestStateStore_SessionCreate_SessionGet(t *testing.T) { t.Fatalf("err: %s", err) } - checks, err := tx.Get("session_checks", "session", sess2.ID) + checks, err := tx.Get(tableSessionChecks, indexSession, Query{Value: sess2.ID}) if err != nil { t.Fatalf("err: %s", err) } @@ -509,7 +509,7 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) { tx := s.db.Txn(false) defer tx.Abort() - check, err := tx.First("session_checks", "session", session1) + check, err := tx.First(tableSessionChecks, indexSession, Query{Value: session1}) if err != nil { t.Fatalf("err: %s", err) } @@ -730,7 +730,7 @@ func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) { // Manually make sure the session checks mapping is clear. tx := s.db.Txn(false) - mapping, err := tx.First("session_checks", "session", session.ID) + mapping, err := tx.First(tableSessionChecks, indexSession, Query{Value: session.ID}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index cef5fd13b9..17270b986c 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1865,6 +1865,18 @@ type CheckID struct { EnterpriseMeta } +// NamespaceOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (c CheckID) NamespaceOrDefault() string { + return c.EnterpriseMeta.NamespaceOrDefault() +} + +// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (c CheckID) PartitionOrDefault() string { + return c.EnterpriseMeta.PartitionOrDefault() +} + func NewCheckID(id types.CheckID, entMeta *EnterpriseMeta) CheckID { var cid CheckID cid.ID = id