mirror of
https://github.com/status-im/consul.git
synced 2025-02-13 14:16:35 +00:00
state: convert nodes.ID to new functional pattern
In preparation for adding other identifiers to the index.
This commit is contained in:
parent
340d714fd6
commit
a4e68e32d6
@ -29,6 +29,13 @@ const (
|
|||||||
minUUIDLookupLen = 2
|
minUUIDLookupLen = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Query is type used to query any single value index that may include an
|
||||||
|
// enterprise identifier.
|
||||||
|
type Query struct {
|
||||||
|
Value string
|
||||||
|
structs.EnterpriseMeta
|
||||||
|
}
|
||||||
|
|
||||||
func resizeNodeLookupKey(s string) string {
|
func resizeNodeLookupKey(s string) string {
|
||||||
l := len(s)
|
l := len(s)
|
||||||
|
|
||||||
@ -41,7 +48,7 @@ func resizeNodeLookupKey(s string) string {
|
|||||||
|
|
||||||
// Nodes is used to pull the full list of nodes for use during snapshots.
|
// Nodes is used to pull the full list of nodes for use during snapshots.
|
||||||
func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
|
func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("nodes", "id")
|
iter, err := s.tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -128,7 +135,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
|
|||||||
// modify the node at all so we prevent watch churn and useless writes
|
// modify the node at all so we prevent watch churn and useless writes
|
||||||
// and modify index bumps on the node.
|
// and modify index bumps on the node.
|
||||||
{
|
{
|
||||||
existing, err := tx.First("nodes", "id", node.Node)
|
existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("node lookup failed: %s", err)
|
return fmt.Errorf("node lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -187,7 +194,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
|
|||||||
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
|
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
|
||||||
func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error {
|
func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error {
|
||||||
// Retrieve all of the nodes
|
// Retrieve all of the nodes
|
||||||
enodes, err := tx.Get("nodes", "id")
|
enodes, err := tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Cannot lookup all nodes: %s", err)
|
return fmt.Errorf("Cannot lookup all nodes: %s", err)
|
||||||
}
|
}
|
||||||
@ -289,7 +296,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
|
|||||||
|
|
||||||
// Check for an existing node by name to support nodes with no IDs.
|
// Check for an existing node by name to support nodes with no IDs.
|
||||||
if n == nil {
|
if n == nil {
|
||||||
existing, err := tx.First("nodes", "id", node.Node)
|
existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("node name lookup failed: %s", err)
|
return fmt.Errorf("node name lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -354,7 +361,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) {
|
func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) {
|
||||||
node, err := tx.First("nodes", "id", nodeName)
|
node, err := tx.First(tableNodes, indexID, Query{Value: nodeName})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("node lookup failed: %s", err)
|
return nil, fmt.Errorf("node lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -403,7 +410,7 @@ func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
|
|||||||
idx := maxIndexTxn(tx, "nodes")
|
idx := maxIndexTxn(tx, "nodes")
|
||||||
|
|
||||||
// Retrieve all of the nodes
|
// Retrieve all of the nodes
|
||||||
nodes, err := tx.Get("nodes", "id")
|
nodes, err := tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -493,7 +500,7 @@ func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string)
|
|||||||
// the store within a given transaction.
|
// the store within a given transaction.
|
||||||
func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
||||||
// Look up the node.
|
// Look up the node.
|
||||||
node, err := tx.First("nodes", "id", nodeName)
|
node, err := tx.First(tableNodes, indexID, Query{Value: nodeName})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("node lookup failed: %s", err)
|
return fmt.Errorf("node lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -654,7 +661,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||||||
// That's always populated when we read from the state store.
|
// That's always populated when we read from the state store.
|
||||||
entry := svc.ToServiceNode(node)
|
entry := svc.ToServiceNode(node)
|
||||||
// Get the node
|
// Get the node
|
||||||
n, err := tx.First("nodes", "id", node)
|
n, err := tx.First(tableNodes, indexID, Query{Value: node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed node lookup: %s", err)
|
return fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1082,7 +1089,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
|
|||||||
func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
|
func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
|
||||||
// We don't want to track an unlimited number of nodes, so we pull a
|
// We don't want to track an unlimited number of nodes, so we pull a
|
||||||
// top-level watch to use as a fallback.
|
// top-level watch to use as a fallback.
|
||||||
allNodes, err := tx.Get("nodes", "id")
|
allNodes, err := tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed nodes lookup: %s", err)
|
return nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1097,7 +1104,7 @@ func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNo
|
|||||||
s := sn.PartialClone()
|
s := sn.PartialClone()
|
||||||
|
|
||||||
// Grab the corresponding node record.
|
// Grab the corresponding node record.
|
||||||
watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node)
|
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed node lookup: %s", err)
|
return nil, fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1158,7 +1165,7 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
|
|||||||
idx := catalogMaxIndex(tx, entMeta, false)
|
idx := catalogMaxIndex(tx, entMeta, false)
|
||||||
|
|
||||||
// Query the node by node name
|
// Query the node by node name
|
||||||
watchCh, n, err := tx.FirstWatch("nodes", "id", nodeNameOrID)
|
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, 0, nil, nil, fmt.Errorf("node lookup failed: %s", err)
|
return true, 0, nil, nil, fmt.Errorf("node lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -1476,7 +1483,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the node
|
// Get the node
|
||||||
node, err := tx.First("nodes", "id", hc.Node)
|
node, err := tx.First(tableNodes, indexID, Query{Value: hc.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed node lookup: %s", err)
|
return fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1702,7 +1709,7 @@ func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
|
|||||||
|
|
||||||
// We don't want to track an unlimited number of nodes, so we pull a
|
// We don't want to track an unlimited number of nodes, so we pull a
|
||||||
// top-level watch to use as a fallback.
|
// top-level watch to use as a fallback.
|
||||||
allNodes, err := tx.Get("nodes", "id")
|
allNodes, err := tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1712,7 +1719,7 @@ func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
|
|||||||
var results structs.HealthChecks
|
var results structs.HealthChecks
|
||||||
for check := iter.Next(); check != nil; check = iter.Next() {
|
for check := iter.Next(); check != nil; check = iter.Next() {
|
||||||
healthCheck := check.(*structs.HealthCheck)
|
healthCheck := check.(*structs.HealthCheck)
|
||||||
watchCh, node, err := tx.FirstWatch("nodes", "id", healthCheck.Node)
|
watchCh, node, err := tx.FirstWatch(tableNodes, indexID, Query{Value: healthCheck.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2117,7 +2124,7 @@ func parseCheckServiceNodes(
|
|||||||
|
|
||||||
// We don't want to track an unlimited number of nodes, so we pull a
|
// We don't want to track an unlimited number of nodes, so we pull a
|
||||||
// top-level watch to use as a fallback.
|
// top-level watch to use as a fallback.
|
||||||
allNodes, err := tx.Get("nodes", "id")
|
allNodes, err := tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2135,7 +2142,7 @@ func parseCheckServiceNodes(
|
|||||||
results := make(structs.CheckServiceNodes, 0, len(services))
|
results := make(structs.CheckServiceNodes, 0, len(services))
|
||||||
for _, sn := range services {
|
for _, sn := range services {
|
||||||
// Retrieve the node.
|
// Retrieve the node.
|
||||||
watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node)
|
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2191,7 +2198,7 @@ func (s *Store) NodeInfo(ws memdb.WatchSet, node string, entMeta *structs.Enterp
|
|||||||
idx := catalogMaxIndex(tx, entMeta, true)
|
idx := catalogMaxIndex(tx, entMeta, true)
|
||||||
|
|
||||||
// Query the node by the passed node
|
// Query the node by the passed node
|
||||||
nodes, err := tx.Get("nodes", "id", node)
|
nodes, err := tx.Get(tableNodes, indexID, Query{Value: node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2210,7 +2217,7 @@ func (s *Store) NodeDump(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui
|
|||||||
idx := catalogMaxIndex(tx, entMeta, true)
|
idx := catalogMaxIndex(tx, entMeta, true)
|
||||||
|
|
||||||
// Fetch all of the registered nodes
|
// Fetch all of the registered nodes
|
||||||
nodes, err := tx.Get("nodes", "id")
|
nodes, err := tx.Get(tableNodes, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -384,7 +384,7 @@ func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]strea
|
|||||||
// the full list of checks for a specific service on that node.
|
// the full list of checks for a specific service on that node.
|
||||||
func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc, error) {
|
func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc, error) {
|
||||||
// Fetch the node
|
// Fetch the node
|
||||||
nodeRaw, err := tx.First("nodes", "id", node)
|
nodeRaw, err := tx.First(tableNodes, indexID, Query{Value: node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,34 @@ func indexFromNodeServiceQuery(arg interface{}) ([]byte, error) {
|
|||||||
return b.Bytes(), nil
|
return b.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func indexFromNode(raw interface{}) ([]byte, error) {
|
||||||
|
n, ok := raw.(*structs.Node)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected type %T for structs.Node index", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.Node == "" {
|
||||||
|
return nil, errMissingValueForIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
var b indexBuilder
|
||||||
|
b.String(strings.ToLower(n.Node))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// indexFromNodeQuery builds an index key where Query.Value is lowercase, and is
|
||||||
|
// a required value.
|
||||||
|
func indexFromNodeQuery(arg interface{}) ([]byte, error) {
|
||||||
|
q, ok := arg.(Query)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected type %T for Query index", arg)
|
||||||
|
}
|
||||||
|
|
||||||
|
var b indexBuilder
|
||||||
|
b.String(strings.ToLower(q.Value))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
||||||
return fmt.Sprintf("service.%s", name)
|
return fmt.Sprintf("service.%s", name)
|
||||||
}
|
}
|
||||||
|
@ -24,3 +24,18 @@ func testIndexerTableChecks() map[string]indexerTestCase {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testIndexerTableNodes() map[string]indexerTestCase {
|
||||||
|
return map[string]indexerTestCase{
|
||||||
|
indexID: {
|
||||||
|
read: indexValue{
|
||||||
|
source: Query{Value: "NoDeId"},
|
||||||
|
expected: []byte("nodeid\x00"),
|
||||||
|
},
|
||||||
|
write: indexValue{
|
||||||
|
source: &structs.Node{Node: "NoDeId"},
|
||||||
|
expected: []byte("nodeid\x00"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -25,8 +25,7 @@ const (
|
|||||||
indexNodeService = "node_service"
|
indexNodeService = "node_service"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nodesTableSchema returns a new table schema used for storing node
|
// nodesTableSchema returns a new table schema used for storing struct.Node.
|
||||||
// information.
|
|
||||||
func nodesTableSchema() *memdb.TableSchema {
|
func nodesTableSchema() *memdb.TableSchema {
|
||||||
return &memdb.TableSchema{
|
return &memdb.TableSchema{
|
||||||
Name: tableNodes,
|
Name: tableNodes,
|
||||||
@ -35,18 +34,16 @@ func nodesTableSchema() *memdb.TableSchema {
|
|||||||
Name: indexID,
|
Name: indexID,
|
||||||
AllowMissing: false,
|
AllowMissing: false,
|
||||||
Unique: true,
|
Unique: true,
|
||||||
Indexer: &memdb.StringFieldIndex{
|
Indexer: indexerSingle{
|
||||||
Field: "Node",
|
readIndex: readIndex(indexFromNodeQuery),
|
||||||
Lowercase: true,
|
writeIndex: writeIndex(indexFromNode),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"uuid": {
|
"uuid": {
|
||||||
Name: "uuid",
|
Name: "uuid",
|
||||||
AllowMissing: true,
|
AllowMissing: true,
|
||||||
Unique: true,
|
Unique: true,
|
||||||
Indexer: &memdb.UUIDFieldIndex{
|
Indexer: &memdb.UUIDFieldIndex{Field: "ID"},
|
||||||
Field: "ID",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
"meta": {
|
"meta": {
|
||||||
Name: "meta",
|
Name: "meta",
|
||||||
|
@ -146,7 +146,7 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
|
|||||||
// don't carefully sequence this, and since it will fix itself
|
// don't carefully sequence this, and since it will fix itself
|
||||||
// on the next coordinate update from that node, we don't return
|
// on the next coordinate update from that node, we don't return
|
||||||
// an error or log anything.
|
// an error or log anything.
|
||||||
node, err := tx.First("nodes", "id", update.Node)
|
node, err := tx.First(tableNodes, indexID, Query{Value: update.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed node lookup: %s", err)
|
return fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -129,6 +129,7 @@ func TestNewDBSchema_Indexers(t *testing.T) {
|
|||||||
|
|
||||||
var testcases = map[string]func() map[string]indexerTestCase{
|
var testcases = map[string]func() map[string]indexerTestCase{
|
||||||
tableChecks: testIndexerTableChecks,
|
tableChecks: testIndexerTableChecks,
|
||||||
|
tableNodes: testIndexerTableNodes,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, table := range schema.Tables {
|
for _, table := range schema.Tables {
|
||||||
|
@ -195,7 +195,7 @@ func sessionCreateTxn(tx *txn, idx uint64, sess *structs.Session) error {
|
|||||||
sess.ModifyIndex = idx
|
sess.ModifyIndex = idx
|
||||||
|
|
||||||
// Check that the node exists
|
// Check that the node exists
|
||||||
node, err := tx.First("nodes", "id", sess.Node)
|
node, err := tx.First(tableNodes, indexID, Query{Value: sess.Node})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed node lookup: %s", err)
|
return fmt.Errorf("failed node lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,7 @@ func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string,
|
|||||||
|
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
n, err := tx.First("nodes", "id", nodeID)
|
n, err := tx.First(tableNodes, indexID, Query{Value: nodeID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ table=mesh-topology
|
|||||||
|
|
||||||
table=nodes
|
table=nodes
|
||||||
index=id unique
|
index=id unique
|
||||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
|
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNode
|
||||||
index=meta allow-missing
|
index=meta allow-missing
|
||||||
indexer=github.com/hashicorp/go-memdb.StringMapFieldIndex Field=Meta Lowercase=false
|
indexer=github.com/hashicorp/go-memdb.StringMapFieldIndex Field=Meta Lowercase=false
|
||||||
index=uuid unique allow-missing
|
index=uuid unique allow-missing
|
||||||
|
Loading…
x
Reference in New Issue
Block a user