state: partition nodes and coordinates in the state store (#10859)

Additionally:

- partitioned the catalog indexes appropriately for partitioning
- removed a stray reference to a non-existent index named "node.checks"
This commit is contained in:
R.B. Boyer 2021-08-17 13:29:39 -05:00 committed by GitHub
parent 540f88d622
commit 310e775a8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 723 additions and 324 deletions

View File

@ -56,6 +56,7 @@ func TestFSM_RegisterNode(t *testing.T) {
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(),
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
@ -114,6 +115,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
Status: api.HealthPassing,
ServiceID: "db",
},
EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(),
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
@ -713,10 +715,12 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Coord: generateRandomCoordinate(),
},
}
@ -734,9 +738,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
require.Equal(t, updates, coords)
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {

View File

@ -96,6 +96,8 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
// Register each node
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
nodeEntMeta := n.GetEnterpriseMeta()
req := structs.RegisterRequest{
ID: n.ID,
Node: n.Node,
@ -104,6 +106,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
TaggedAddresses: n.TaggedAddresses,
NodeMeta: n.Meta,
RaftIndex: n.RaftIndex,
EnterpriseMeta: *nodeEntMeta,
}
// Register the node itself
@ -115,8 +118,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
}
// Register each service this node has
// TODO(partitions)
services, err := s.state.Services(n.Node, nil)
services, err := s.state.Services(n.Node, nodeEntMeta)
if err != nil {
return err
}
@ -132,8 +134,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
// Register each check this node has
req.Service = nil
// TODO(partitions)
checks, err := s.state.Checks(n.Node, nil)
checks, err := s.state.Checks(n.Node, nodeEntMeta)
if err != nil {
return err
}

View File

@ -876,6 +876,7 @@ func TestInternal_GatewayServiceDump_Terminating(t *testing.T) {
{
Node: &structs.Node{
Node: "baz",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Address: "127.0.0.3",
Datacenter: "dc1",
},
@ -908,6 +909,7 @@ func TestInternal_GatewayServiceDump_Terminating(t *testing.T) {
{
Node: &structs.Node{
Node: "bar",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Address: "127.0.0.2",
Datacenter: "dc1",
},
@ -1215,6 +1217,7 @@ func TestInternal_GatewayServiceDump_Ingress(t *testing.T) {
{
Node: &structs.Node{
Node: "bar",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Address: "127.0.0.2",
Datacenter: "dc1",
},
@ -1250,6 +1253,7 @@ func TestInternal_GatewayServiceDump_Ingress(t *testing.T) {
{
Node: &structs.Node{
Node: "baz",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Address: "127.0.0.3",
Datacenter: "dc1",
},

View File

@ -49,16 +49,28 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
// Services is used to pull the full list of services for a given node for use
// during snapshots.
func (s *Snapshot) Services(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO(partitions): use the provided entmeta
return s.tx.Get(tableServices, indexNode, Query{Value: node})
func (s *Snapshot) Services(node string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
return s.tx.Get(tableServices, indexNode, Query{
Value: node,
EnterpriseMeta: *entMeta,
})
}
// Checks is used to pull the full list of checks for a given node for use
// during snapshots.
func (s *Snapshot) Checks(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO(partitions): use the provided entmeta
return s.tx.Get(tableChecks, indexNode, Query{Value: node})
func (s *Snapshot) Checks(node string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
return s.tx.Get(tableChecks, indexNode, Query{
Value: node,
EnterpriseMeta: *entMeta,
})
}
// Registration is used to make sure a node, service, and check registration is
@ -83,6 +95,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
}
func (s *Store) ensureCheckIfNodeMatches(tx WriteTxn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
// TODO(partitions): do we have to check partition here? probably not
if check.Node != node {
return fmt.Errorf("check node %q does not match node %q",
check.Node, node)
@ -107,6 +120,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
Node: req.Node,
Address: req.Address,
Datacenter: req.Datacenter,
Partition: req.PartitionOrDefault(),
TaggedAddresses: req.TaggedAddresses,
Meta: req.NodeMeta,
}
@ -121,7 +135,10 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
// modify the node at all so we prevent watch churn and useless writes
// and modify index bumps on the node.
{
existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node})
existing, err := tx.First(tableNodes, indexID, Query{
Value: node.Node,
EnterpriseMeta: *node.GetEnterpriseMeta(),
})
if err != nil {
return fmt.Errorf("node lookup failed: %s", err)
}
@ -136,7 +153,11 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
// node info above to make sure we actually need to update the service
// definition in order to prevent useless churn if nothing has changed.
if req.Service != nil {
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{
EnterpriseMeta: req.Service.EnterpriseMeta,
Node: req.Node,
Service: req.Service.ID,
})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -180,7 +201,8 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error {
// Retrieve all of the nodes
enodes, err := tx.Get(tableNodes, indexID)
enodes, err := tx.Get(tableNodes, indexID+"_prefix", node.GetEnterpriseMeta())
if err != nil {
return fmt.Errorf("Cannot lookup all nodes: %s", err)
}
@ -189,7 +211,11 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
// Look up the existing node's Serf health check to see if it's failed.
// If it is, the node can be renamed.
enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckQuery{
EnterpriseMeta: *node.GetEnterpriseMeta(),
Node: enode.Node,
CheckID: string(structs.SerfCheckID),
})
if err != nil {
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
}
@ -216,7 +242,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi
// Returns a bool indicating if a write happened and any error.
func (s *Store) ensureNodeCASTxn(tx WriteTxn, idx uint64, node *structs.Node) (bool, error) {
// Retrieve the existing entry.
existing, err := getNodeTxn(tx, node.Node)
existing, err := getNodeTxn(tx, node.Node, node.GetEnterpriseMeta())
if err != nil {
return false, err
}
@ -249,6 +275,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
// name is the same.
var n *structs.Node
if node.ID != "" {
// TODO(partitions): should this take a node ent-meta?
existing, err := getNodeIDTxn(tx, node.ID)
if err != nil {
return fmt.Errorf("node lookup failed: %s", err)
@ -262,7 +289,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
return fmt.Errorf("Error while renaming Node ID: %q (%s): %s", node.ID, node.Address, dupNameError)
}
// We are actually renaming a node, remove its reference first
err := s.deleteNodeTxn(tx, idx, n.Node)
err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta())
if err != nil {
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s",
node.ID, node.Address, n.Node, node.Node)
@ -282,7 +309,10 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
// Check for an existing node by name to support nodes with no IDs.
if n == nil {
existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node})
existing, err := tx.First(tableNodes, indexID, Query{
Value: node.Node,
EnterpriseMeta: *node.GetEnterpriseMeta(),
})
if err != nil {
return fmt.Errorf("node name lookup failed: %s", err)
}
@ -314,41 +344,35 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
}
// Insert the node and update the index.
if err := tx.Insert("nodes", node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
}
if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// Update the node's service indexes as the node information is included
// in health queries and we would otherwise miss node updates in some cases
// for those queries.
if err := updateAllServiceIndexesOfNode(tx, idx, node.Node); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
return catalogInsertNode(tx, node)
}
// GetNode is used to retrieve a node registration by node name ID.
func (s *Store) GetNode(nodeNameOrID string, _ *structs.EnterpriseMeta) (uint64, *structs.Node, error) {
// TODO(partitions): use the provided entmeta
func (s *Store) GetNode(nodeNameOrID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.Node, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Get the table index.
idx := maxIndexTxn(tx, "nodes")
idx := catalogNodesMaxIndex(tx, entMeta)
// Retrieve the node from the state store
node, err := getNodeTxn(tx, nodeNameOrID)
node, err := getNodeTxn(tx, nodeNameOrID, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
}
return idx, node, nil
}
func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) {
node, err := tx.First(tableNodes, indexID, Query{Value: nodeName})
func getNodeTxn(tx ReadTxn, nodeNameOrID string, entMeta *structs.EnterpriseMeta) (*structs.Node, error) {
node, err := tx.First(tableNodes, indexID, Query{
Value: nodeNameOrID,
EnterpriseMeta: *entMeta,
})
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
}
@ -365,7 +389,7 @@ func getNodeIDTxn(tx ReadTxn, id types.NodeID) (*structs.Node, error) {
return nil, fmt.Errorf("node lookup by ID failed, wrong UUID: %v for '%s'", err, strnode)
}
node, err := tx.First("nodes", "uuid", uuidValue)
node, err := tx.First(tableNodes, "uuid", uuidValue)
if err != nil {
return nil, fmt.Errorf("node lookup by ID failed: %s", err)
}
@ -381,7 +405,9 @@ func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes")
///
// NOTE: nodeIDs aren't partitioned so don't use the convenience function.
idx := maxIndexTxn(tx, tableNodes)
// Retrieve the node from the state store
node, err := getNodeIDTxn(tx, id)
@ -389,16 +415,20 @@ func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
}
// Nodes is used to return all of the known nodes.
func (s *Store) Nodes(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
// TODO(partitions): use the provided entmeta
func (s *Store) Nodes(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Get the table index.
idx := maxIndexTxn(tx, "nodes")
idx := catalogNodesMaxIndex(tx, entMeta)
// Retrieve all of the nodes
nodes, err := tx.Get(tableNodes, indexID)
nodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -413,13 +443,20 @@ func (s *Store) Nodes(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, str
}
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
// TODO(partitions): use the provided entmeta
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, entMeta *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Get the table index.
idx := maxIndexTxn(tx, "nodes")
idx := maxIndexTxn(tx, tableNodes)
// TODO:(partitions) use the partitioned meta index
// idx := catalogNodesMaxIndex(tx, entMeta)
_ = entMeta
// Retrieve all of the nodes
var args []interface{}
@ -427,7 +464,8 @@ func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *str
args = append(args, key, value)
break
}
nodes, err := tx.Get("nodes", "meta", args...)
nodes, err := tx.Get(tableNodes, "meta", args...)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -445,13 +483,20 @@ func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *str
}
// DeleteNode is used to delete a given node by its ID.
func (s *Store) DeleteNode(idx uint64, nodeName string, _ *structs.EnterpriseMeta) error {
// TODO(partitions): use the provided entmeta
func (s *Store) DeleteNode(idx uint64, nodeName string, entMeta *structs.EnterpriseMeta) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
// TODO(partition): double check all freshly modified state store functions
// that take an ent meta do this trick
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Call the node deletion.
if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil {
if err := s.deleteNodeTxn(tx, idx, nodeName, entMeta); err != nil {
return err
}
@ -461,9 +506,9 @@ func (s *Store) DeleteNode(idx uint64, nodeName string, _ *structs.EnterpriseMet
// deleteNodeCASTxn is used to try doing a node delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) (bool, error) {
func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Look up the node.
node, err := getNodeTxn(tx, nodeName)
node, err := getNodeTxn(tx, nodeName, entMeta)
if err != nil {
return false, err
}
@ -479,7 +524,7 @@ func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string)
}
// Call the actual deletion if the above passed.
if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil {
if err := s.deleteNodeTxn(tx, idx, nodeName, entMeta); err != nil {
return false, err
}
@ -488,9 +533,17 @@ func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string)
// deleteNodeTxn is the inner method used for removing a node from
// the store within a given transaction.
func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta *structs.EnterpriseMeta) error {
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
// Look up the node.
node, err := tx.First(tableNodes, indexID, Query{Value: nodeName})
node, err := tx.First(tableNodes, indexID, Query{
Value: nodeName,
EnterpriseMeta: *entMeta,
})
if err != nil {
return fmt.Errorf("node lookup failed: %s", err)
}
@ -499,7 +552,10 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
}
// Delete all services associated with the node and update the service index.
services, err := tx.Get(tableServices, indexNode, Query{Value: nodeName})
services, err := tx.Get(tableServices, indexNode, Query{
Value: nodeName,
EnterpriseMeta: *entMeta,
})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -525,7 +581,10 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
// Delete all checks associated with the node. This will invalidate
// sessions as necessary.
checks, err := tx.Get(tableChecks, indexNode, Query{Value: nodeName})
checks, err := tx.Get(tableChecks, indexNode, Query{
Value: nodeName,
EnterpriseMeta: *entMeta,
})
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}
@ -542,28 +601,28 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
}
// Delete any coordinates associated with this node.
coords, err := tx.Get("coordinates", "node", nodeName)
coords, err := tx.Get(tableCoordinates, indexNode, Query{
Value: nodeName,
EnterpriseMeta: *entMeta,
})
if err != nil {
return fmt.Errorf("failed coordinate lookup: %s", err)
}
var coordsToDelete []interface{}
var coordsToDelete []*structs.Coordinate
for coord := coords.Next(); coord != nil; coord = coords.Next() {
coordsToDelete = append(coordsToDelete, coord)
coordsToDelete = append(coordsToDelete, coord.(*structs.Coordinate))
}
for _, coord := range coordsToDelete {
if err := tx.Delete("coordinates", coord); err != nil {
if err := deleteCoordinateTxn(tx, idx, coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err)
}
if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
// Delete the node and update the index.
if err := tx.Delete("nodes", node); err != nil {
if err := tx.Delete(tableNodes, node); err != nil {
return fmt.Errorf("failed deleting node: %s", err)
}
if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
if err := catalogUpdateNodesIndexes(tx, idx, entMeta); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
@ -626,7 +685,11 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node
// existing memdb transaction.
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
// Check for existing service
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{
EnterpriseMeta: svc.EnterpriseMeta,
Node: node,
Service: svc.ID,
})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -651,7 +714,10 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
// That's always populated when we read from the state store.
entry := svc.ToServiceNode(node)
// Get the node
n, err := tx.First(tableNodes, indexID, Query{Value: node})
n, err := tx.First(tableNodes, indexID, Query{
Value: node,
EnterpriseMeta: svc.EnterpriseMeta,
})
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
@ -765,7 +831,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
// Get the table index.
idx := catalogServicesMaxIndex(tx, entMeta)
if nodeIdx := maxIndexTxn(tx, "nodes"); nodeIdx > idx {
if nodeIdx := catalogNodesMaxIndex(tx, entMeta); nodeIdx > idx {
idx = nodeIdx
}
@ -775,7 +841,8 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
args = append(args, key, value)
break
}
nodes, err := tx.Get("nodes", "meta", args...)
// TODO(partitions): scope the meta index to a partition
nodes, err := tx.Get(tableNodes, "meta", args...)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -971,7 +1038,7 @@ func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, index string, q Query) (uint
}
// Fill in the node details.
results, err = parseServiceNodes(tx, ws, results)
results, err = parseServiceNodes(tx, ws, results, &q.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
@ -1017,7 +1084,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
}
// Fill in the node details.
results, err = parseServiceNodes(tx, ws, results)
results, err = parseServiceNodes(tx, ws, results, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
@ -1087,7 +1154,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
}
// Fill in the node details.
results, err = parseServiceNodes(tx, ws, results)
results, err = parseServiceNodes(tx, ws, results, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
@ -1096,10 +1163,10 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
// parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice.
func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes, entMeta *structs.EnterpriseMeta) (structs.ServiceNodes, error) {
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get(tableNodes, indexID)
allNodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta)
if err != nil {
return nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -1114,7 +1181,10 @@ func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNo
s := sn.PartialClone()
// Grab the corresponding node record.
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node})
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
Value: sn.Node,
EnterpriseMeta: sn.EnterpriseMeta,
})
if err != nil {
return nil, fmt.Errorf("failed node lookup: %s", err)
}
@ -1128,6 +1198,7 @@ func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNo
s.Address = node.Address
s.Datacenter = node.Datacenter
s.TaggedAddresses = node.TaggedAddresses
s.EnterpriseMeta.Merge(node.GetEnterpriseMeta())
s.NodeMeta = node.Meta
results = append(results, s)
@ -1160,7 +1231,11 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
}
// Query the service
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{
EnterpriseMeta: *entMeta,
Node: nodeName,
Service: serviceID,
})
if err != nil {
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
@ -1176,11 +1251,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
tx := s.db.Txn(false)
defer tx.Abort()
// TODO: accept non-pointer value for entMeta
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
// Get the table index.
idx := catalogMaxIndex(tx, entMeta, false)
// Query the node by node name
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID})
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID, EnterpriseMeta: *entMeta})
if err != nil {
return true, 0, nil, nil, fmt.Errorf("node lookup failed: %s", err)
}
@ -1194,7 +1274,7 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st
}
// Attempt to lookup the node by its node ID
iter, err := tx.Get("nodes", "uuid_prefix", resizeNodeLookupKey(nodeNameOrID))
iter, err := tx.Get(tableNodes, "uuid_prefix", resizeNodeLookupKey(nodeNameOrID))
if err != nil {
ws.Add(watchCh)
// TODO(sean@): We could/should log an error re: the uuid_prefix lookup
@ -1347,7 +1427,11 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
}
// Delete any checks associated with the service. This will invalidate
// sessions as necessary.
nsq := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
nsq := NodeServiceQuery{
Node: nodeName,
Service: serviceID,
EnterpriseMeta: *entMeta,
}
checks, err := tx.Get(tableChecks, indexNodeService, nsq)
if err != nil {
return fmt.Errorf("failed service check lookup: %s", err)
@ -1432,8 +1516,11 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
}
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error {
services, err := tx.Get(tableServices, indexNode, Query{Value: nodeID})
func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string, entMeta *structs.EnterpriseMeta) error {
services, err := tx.Get(tableServices, indexNode, Query{
Value: nodeID,
EnterpriseMeta: *entMeta.WildcardEnterpriseMetaForPartition(),
})
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
}
@ -1483,7 +1570,11 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe
// checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
// Check if we have an existing health check
existing, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
existing, err := tx.First(tableChecks, indexID, NodeCheckQuery{
EnterpriseMeta: hc.EnterpriseMeta,
Node: hc.Node,
CheckID: string(hc.CheckID),
})
if err != nil {
return fmt.Errorf("failed health check lookup: %s", err)
}
@ -1503,7 +1594,10 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
}
// Get the node
node, err := tx.First(tableNodes, indexID, Query{Value: hc.Node})
node, err := tx.First(tableNodes, indexID, Query{
Value: hc.Node,
EnterpriseMeta: hc.EnterpriseMeta,
})
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
@ -1515,7 +1609,11 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{
EnterpriseMeta: hc.EnterpriseMeta,
Node: hc.Node,
Service: hc.ServiceID,
})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1543,7 +1641,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
} else {
// Since the check has been modified, it impacts all services of node
// Update the status for all the services associated with this node
err = updateAllServiceIndexesOfNode(tx, idx, hc.Node)
err = updateAllServiceIndexesOfNode(tx, idx, hc.Node, &hc.EnterpriseMeta)
if err != nil {
return err
}
@ -1683,7 +1781,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
}
ws.Add(iter.WatchCh())
return parseChecksByNodeMeta(tx, ws, idx, iter, filters)
return parseChecksByNodeMeta(tx, ws, idx, iter, filters, entMeta)
}
// ChecksInState is used to query the state store for all checks
@ -1715,7 +1813,7 @@ func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters
return 0, nil, err
}
return parseChecksByNodeMeta(tx, ws, idx, iter, filters)
return parseChecksByNodeMeta(tx, ws, idx, iter, filters, entMeta)
}
func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) {
@ -1746,11 +1844,12 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru
// parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields.
func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
idx uint64, iter memdb.ResultIterator, filters map[string]string,
entMeta *structs.EnterpriseMeta) (uint64, structs.HealthChecks, error) {
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get(tableNodes, indexID)
allNodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -1760,7 +1859,10 @@ func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
healthCheck := check.(*structs.HealthCheck)
watchCh, node, err := tx.FirstWatch(tableNodes, indexID, Query{Value: healthCheck.Node})
watchCh, node, err := tx.FirstWatch(tableNodes, indexID, Query{
Value: healthCheck.Node,
EnterpriseMeta: healthCheck.EnterpriseMeta,
})
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
@ -1871,7 +1973,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
return err
}
} else {
if err := updateAllServiceIndexesOfNode(tx, idx, existing.Node); err != nil {
if err := updateAllServiceIndexesOfNode(tx, idx, existing.Node, &existing.EnterpriseMeta); err != nil {
return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err)
}
if err := catalogUpdateServicesIndexes(tx, idx, entMeta); err != nil {
@ -1999,7 +2101,10 @@ func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, con
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
q := Query{
Value: serviceName,
EnterpriseMeta: *entMeta,
}
iter, err := tx.Get(tableServices, index, q)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
@ -2114,7 +2219,7 @@ func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, con
ws.Add(iter.WatchCh())
}
return parseCheckServiceNodes(tx, fallbackWS, idx, results, err)
return parseCheckServiceNodes(tx, fallbackWS, idx, results, entMeta, err)
}
// CheckServiceTagNodes is used to query all nodes and checks for a given
@ -2148,7 +2253,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
// Get the table index.
idx := maxIndexForService(tx, serviceName, serviceExists, true, entMeta)
return parseCheckServiceNodes(tx, ws, idx, results, err)
return parseCheckServiceNodes(tx, ws, idx, results, entMeta, err)
}
// GatewayServices is used to query all services associated with a gateway
@ -2181,6 +2286,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
func parseCheckServiceNodes(
tx ReadTxn, ws memdb.WatchSet, idx uint64,
services structs.ServiceNodes,
entMeta *structs.EnterpriseMeta,
err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil {
return 0, nil, err
@ -2194,7 +2300,7 @@ func parseCheckServiceNodes(
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get(tableNodes, indexID)
allNodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -2203,7 +2309,7 @@ func parseCheckServiceNodes(
// We need a similar fallback for checks. Since services need the
// status of node + service-specific checks, we pull in a top-level
// watch over all checks.
allChecks, err := tx.Get(tableChecks, indexID)
allChecks, err := tx.Get(tableChecks, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
}
@ -2212,7 +2318,10 @@ func parseCheckServiceNodes(
results := make(structs.CheckServiceNodes, 0, len(services))
for _, sn := range services {
// Retrieve the node.
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node})
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
Value: sn.Node,
EnterpriseMeta: sn.EnterpriseMeta,
})
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
@ -2226,7 +2335,11 @@ func parseCheckServiceNodes(
// First add the node-level checks. These always apply to any
// service on the node.
var checks structs.HealthChecks
q := NodeServiceQuery{Node: sn.Node, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()}
q := NodeServiceQuery{
Node: sn.Node,
Service: "", // node checks have no service
EnterpriseMeta: *sn.EnterpriseMeta.WildcardEnterpriseMetaForPartition(),
}
iter, err := tx.Get(tableChecks, indexNodeService, q)
if err != nil {
return 0, nil, err
@ -2237,7 +2350,11 @@ func parseCheckServiceNodes(
}
// Now add the service-specific checks.
q = NodeServiceQuery{Node: sn.Node, Service: sn.ServiceID, EnterpriseMeta: sn.EnterpriseMeta}
q = NodeServiceQuery{
Node: sn.Node,
Service: sn.ServiceID,
EnterpriseMeta: sn.EnterpriseMeta,
}
iter, err = tx.Get(tableChecks, indexNodeService, q)
if err != nil {
return 0, nil, err
@ -2264,11 +2381,18 @@ func (s *Store) NodeInfo(ws memdb.WatchSet, node string, entMeta *structs.Enterp
tx := s.db.Txn(false)
defer tx.Abort()
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Get the table index.
idx := catalogMaxIndex(tx, entMeta, true)
// Query the node by the passed node
nodes, err := tx.Get(tableNodes, indexID, Query{Value: node})
nodes, err := tx.Get(tableNodes, indexID, Query{
Value: node,
EnterpriseMeta: *entMeta,
})
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
@ -2287,7 +2411,7 @@ func (s *Store) NodeDump(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui
idx := catalogMaxIndex(tx, entMeta, true)
// Fetch all of the registered nodes
nodes, err := tx.Get(tableNodes, indexID)
nodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
@ -2321,7 +2445,7 @@ func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.Enterpris
results = append(results, sn)
}
return parseCheckServiceNodes(tx, nil, idx, results, err)
return parseCheckServiceNodes(tx, nil, idx, results, entMeta, err)
}
func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
@ -2345,7 +2469,7 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind,
results = append(results, sn)
}
return parseCheckServiceNodes(tx, nil, idx, results, err)
return parseCheckServiceNodes(tx, nil, idx, results, entMeta, err)
}
// parseNodes takes an iterator over a set of nodes and returns a struct
@ -2360,14 +2484,14 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
// We don't want to track an unlimited number of services, so we pull a
// top-level watch to use as a fallback.
allServices, err := tx.Get(tableServices, indexID)
allServices, err := tx.Get(tableServices, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
}
allServicesCh := allServices.WatchCh()
// We need a similar fallback for checks.
allChecks, err := tx.Get(tableChecks, indexID)
allChecks, err := tx.Get(tableChecks, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
}
@ -2381,6 +2505,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
dump := &structs.NodeInfo{
ID: node.ID,
Node: node.Node,
Partition: node.Partition,
Address: node.Address,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
@ -2793,7 +2918,10 @@ func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind str
maxIdx = lib.MaxUint64(maxIdx, mapping.ModifyIndex)
// Look up nodes for gateway
q := Query{Value: mapping.Gateway.Name, EnterpriseMeta: mapping.Gateway.EnterpriseMeta}
q := Query{
Value: mapping.Gateway.Name,
EnterpriseMeta: mapping.Gateway.EnterpriseMeta,
}
gwServices, err := tx.Get(tableServices, indexService, q)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
@ -3262,8 +3390,8 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
oldUpstreams := make(map[structs.ServiceName]bool)
if e, ok := existing.(*structs.ServiceNode); ok {
for _, u := range e.ServiceProxy.Upstreams {
upstreamMeta := structs.NewEnterpriseMetaInDefaultPartition(u.DestinationNamespace)
sn := structs.NewServiceName(u.DestinationName, &upstreamMeta)
upstreamMeta := e.NewEnterpriseMetaInPartition(u.DestinationNamespace)
sn := structs.NewServiceName(u.DestinationName, upstreamMeta)
oldUpstreams[sn] = true
}
@ -3278,8 +3406,8 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
}
// TODO (freddy): Account for upstream datacenter
upstreamMeta := structs.NewEnterpriseMetaInDefaultPartition(u.DestinationNamespace)
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)
upstreamMeta := svc.NewEnterpriseMetaInPartition(u.DestinationNamespace)
upstream := structs.NewServiceName(u.DestinationName, upstreamMeta)
obj, err := tx.First(tableMeshTopology, indexID, upstream, downstream)
if err != nil {

View File

@ -284,7 +284,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return nil
},
Mutate: func(s *Store, tx *txn) error {
return s.deleteNodeTxn(tx, tx.Index, "node1")
return s.deleteNodeTxn(tx, tx.Index, "node1", nil)
},
WantEvents: []stream.Event{
// Should publish deregistration events for all services
@ -1699,6 +1699,7 @@ func testNodeRegistration(t *testing.T, opts ...regOption) *structs.RegisterRequ
ID: "11111111-2222-3333-4444-555555555555",
Node: "node1",
Address: "10.10.10.10",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
Checks: structs.HealthChecks{
&structs.HealthCheck{
CheckID: "serf-health",
@ -1722,6 +1723,7 @@ func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *struc
ID: svc,
Service: svc,
Port: 8080,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
r.Checks = append(r.Checks,
&structs.HealthCheck{
@ -1732,6 +1734,7 @@ func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *struc
ServiceName: svc,
Type: "ttl",
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
})
for _, opt := range opts {
err := opt(r)
@ -1753,6 +1756,7 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) strea
csn := getPayloadCheckServiceNode(e.Payload)
csn.Node.ID = "11111111-2222-3333-4444-555555555555"
csn.Node.Address = "10.10.10.10"
csn.Node.Partition = structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty()
for _, opt := range opts {
if err := opt(&e); err != nil {

View File

@ -26,6 +26,15 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s
}
}
func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error {
// overall nodes index
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// overall services index
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
@ -60,6 +69,29 @@ func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.Ent
return nil
}
func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
// ensure that the Partition is always clear within the state store in OSS
node.Partition = ""
// Insert the node and update the index.
if err := tx.Insert(tableNodes, node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
}
if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta()); err != nil {
return err
}
// Update the node's service indexes as the node information is included
// in health queries and we would otherwise miss node updates in some cases
// for those queries.
if err := updateAllServiceIndexesOfNode(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta()); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
// Insert the service and update the index
if err := tx.Insert(tableServices, svc); err != nil {
@ -81,6 +113,10 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
return nil
}
func catalogNodesMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, tableNodes)
}
func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, tableServices)
}
@ -107,16 +143,16 @@ func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (i
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
if checks {
return maxIndexTxn(tx, "nodes", tableServices, tableChecks)
return maxIndexTxn(tx, tableNodes, tableServices, tableChecks)
}
return maxIndexTxn(tx, "nodes", tableServices)
return maxIndexTxn(tx, tableNodes, tableServices)
}
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 {
if checks {
return maxIndexWatchTxn(tx, ws, "nodes", tableServices, tableChecks)
return maxIndexWatchTxn(tx, ws, tableNodes, tableServices, tableChecks)
}
return maxIndexWatchTxn(tx, ws, "nodes", tableServices)
return maxIndexWatchTxn(tx, ws, tableNodes, tableServices)
}
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {

View File

@ -185,7 +185,25 @@ func testIndexerTableNodes() map[string]indexerTestCase {
source: &structs.Node{Node: "NoDeId"},
expected: []byte("nodeid\x00"),
},
prefix: []indexValue{
{
source: (*structs.EnterpriseMeta)(nil),
expected: nil,
},
{
source: structs.EnterpriseMeta{},
expected: nil,
},
{
source: Query{Value: "NoDeId"},
expected: []byte("nodeid\x00"),
},
},
},
// TODO: uuid
// TODO: meta
// TODO(partitions): fix schema tests for tables that reference nodes too
}
}

View File

@ -38,9 +38,10 @@ func nodesTableSchema() *memdb.TableSchema {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: indexerSingle{
Indexer: indexerSingleWithPrefix{
readIndex: indexFromQuery,
writeIndex: indexFromNode,
prefixIndex: prefixIndexFromQueryNoNamespace,
},
},
"uuid": {

View File

@ -171,6 +171,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
ID: nodeID,
Node: "node1",
Address: "1.2.3.4",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
TaggedAddresses: map[string]string{"hello": "world"},
Meta: map[string]string{"somekey": "somevalue"},
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 1},
@ -1316,7 +1317,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
}
// Indexes were updated.
for _, tbl := range []string{"nodes", tableServices, tableChecks} {
for _, tbl := range []string{tableNodes, tableServices, tableChecks} {
if idx := s.maxIndex(tbl); idx != 3 {
t.Fatalf("bad index: %d (%s)", idx, tbl)
}
@ -1327,7 +1328,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
if err := s.DeleteNode(4, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("nodes"); idx != 3 {
if idx := s.maxIndex(tableNodes); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
}
@ -4237,6 +4238,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
expect := structs.NodeDump{
&structs.NodeInfo{
Node: "node1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
@ -4294,6 +4296,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
},
&structs.NodeInfo{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node2",
@ -7518,3 +7521,46 @@ func TestProtocolForIngressGateway(t *testing.T) {
})
}
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) {
t.Helper()
all := dumpMaxIndexes(t, tx)
for _, index := range skip {
if _, ok := all[index]; ok {
delete(all, index)
} else {
t.Logf("index %q isn't even set; probably test assertion isn't relevant anymore", index)
}
}
require.Equal(t, expect, all)
// TODO
// for _, index := range indexes {
// require.Equal(t, expectIndex, maxIndexTxn(tx, index),
// "index %s has the wrong value", index)
// }
}
func dumpMaxIndexes(t *testing.T, tx ReadTxn) map[string]uint64 {
out := make(map[string]uint64)
iter, err := tx.Get(tableIndex, "id")
require.NoError(t, err)
for entry := iter.Next(); entry != nil; entry = iter.Next() {
if idx, ok := entry.(*IndexEntry); ok {
out[idx.Key] = idx.Value
}
}
return out
}

View File

@ -2,6 +2,7 @@ package state
import (
"fmt"
"strings"
"github.com/hashicorp/go-memdb"
@ -9,39 +10,88 @@ import (
"github.com/hashicorp/consul/lib"
)
const tableCoordinates = "coordinates"
func indexFromCoordinate(raw interface{}) ([]byte, error) {
c, ok := raw.(*structs.Coordinate)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.Coordinate index", raw)
}
if c.Node == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(c.Node))
b.String(strings.ToLower(c.Segment))
return b.Bytes(), nil
}
func indexNodeFromCoordinate(raw interface{}) ([]byte, error) {
c, ok := raw.(*structs.Coordinate)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.Coordinate index", raw)
}
if c.Node == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(c.Node))
return b.Bytes(), nil
}
func indexFromCoordinateQuery(raw interface{}) ([]byte, error) {
q, ok := raw.(CoordinateQuery)
if !ok {
return nil, fmt.Errorf("unexpected type %T for CoordinateQuery index", raw)
}
if q.Node == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(q.Node))
b.String(strings.ToLower(q.Segment))
return b.Bytes(), nil
}
type CoordinateQuery struct {
Node string
Segment string
Partition string
}
func (c CoordinateQuery) PartitionOrDefault() string {
return structs.PartitionOrDefault(c.Partition)
}
// coordinatesTableSchema returns a new table schema used for storing
// network coordinates.
func coordinatesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "coordinates",
Name: tableCoordinates,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
// AllowMissing is required since we allow
// Segment to be an empty string.
AllowMissing: true,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Segment",
Lowercase: true,
Indexer: indexerSingleWithPrefix{
readIndex: indexFromCoordinateQuery,
writeIndex: indexFromCoordinate,
prefixIndex: prefixIndexFromQueryNoNamespace,
},
},
},
},
"node": {
Name: "node",
indexNode: {
Name: indexNode,
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
Indexer: indexerSingle{
readIndex: indexFromQuery,
writeIndex: indexNodeFromCoordinate,
},
},
},
@ -50,7 +100,7 @@ func coordinatesTableSchema() *memdb.TableSchema {
// Coordinates is used to pull all the coordinates from the snapshot.
func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("coordinates", "id")
iter, err := s.tx.Get(tableCoordinates, indexID)
if err != nil {
return nil, err
}
@ -68,28 +118,31 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
continue
}
if err := s.tx.Insert("coordinates", update); err != nil {
if err := ensureCoordinateTxn(s.tx, idx, update); err != nil {
return fmt.Errorf("failed restoring coordinate: %s", err)
}
}
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(ws memdb.WatchSet, node string, _ *structs.EnterpriseMeta) (uint64, lib.CoordinateSet, error) {
// TODO(partitions): use the provided entmeta
func (s *Store) Coordinate(ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (uint64, lib.CoordinateSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
tableIdx := maxIndexTxn(tx, "coordinates")
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
iter, err := tx.Get("coordinates", "node", node)
tableIdx := coordinatesMaxIndex(tx, entMeta)
iter, err := tx.Get(tableCoordinates, indexNode, Query{
Value: node,
EnterpriseMeta: *entMeta,
})
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
@ -104,16 +157,20 @@ func (s *Store) Coordinate(ws memdb.WatchSet, node string, _ *structs.Enterprise
}
// Coordinates queries for all nodes with coordinates.
func (s *Store) Coordinates(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Coordinates, error) {
// TODO(partitions): use the provided entmeta
func (s *Store) Coordinates(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Get the table index.
idx := maxIndexTxn(tx, "coordinates")
idx := coordinatesMaxIndex(tx, entMeta)
// Pull all the coordinates.
iter, err := tx.Get("coordinates", "id")
iter, err := tx.Get(tableCoordinates, indexID+"_prefix", entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
@ -140,6 +197,8 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
continue
}
entMeta := update.GetEnterpriseMeta()
// Since the cleanup of coordinates is tied to deletion of
// nodes, we silently drop any updates for nodes that we don't
// know about. This might be possible during normal operation
@ -148,7 +207,10 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
// don't carefully sequence this, and since it will fix itself
// on the next coordinate update from that node, we don't return
// an error or log anything.
node, err := tx.First(tableNodes, indexID, Query{Value: update.Node})
node, err := tx.First(tableNodes, indexID, Query{
Value: update.Node,
EnterpriseMeta: *entMeta,
})
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
@ -156,15 +218,22 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
continue
}
if err := tx.Insert("coordinates", update); err != nil {
if err := ensureCoordinateTxn(tx, idx, update); err != nil {
return fmt.Errorf("failed inserting coordinate: %s", err)
}
}
// Update the index.
if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return tx.Commit()
}
func deleteCoordinateTxn(tx WriteTxn, idx uint64, coord *structs.Coordinate) error {
if err := tx.Delete(tableCoordinates, coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err)
}
if err := updateCoordinatesIndexes(tx, idx, coord.GetEnterpriseMeta()); err != nil {
return fmt.Errorf("failed updating coordinate index: %s", err)
}
return nil
}

View File

@ -0,0 +1,36 @@
// +build !consulent
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
)
func coordinatesMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, tableCoordinates)
}
func updateCoordinatesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error {
// Update the index.
if err := indexUpdateMaxTxn(tx, idx, tableCoordinates); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
func ensureCoordinateTxn(tx WriteTxn, idx uint64, coord *structs.Coordinate) error {
// ensure that the Partition is always empty within the state store
coord.Partition = ""
if err := tx.Insert(tableCoordinates, coord); err != nil {
return fmt.Errorf("failed inserting coordinate: %s", err)
}
if err := updateCoordinatesIndexes(tx, idx, coord.GetEnterpriseMeta()); err != nil {
return fmt.Errorf("failed updating coordinate index: %s", err)
}
return nil
}

View File

@ -0,0 +1,54 @@
// +build !consulent
package state
import "github.com/hashicorp/consul/agent/structs"
func testIndexerTableCoordinates() map[string]indexerTestCase {
return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: CoordinateQuery{
Node: "NoDeId",
Segment: "SeGmEnT",
},
expected: []byte("nodeid\x00segment\x00"),
},
write: indexValue{
source: &structs.Coordinate{
Node: "NoDeId",
Segment: "SeGmEnT",
},
expected: []byte("nodeid\x00segment\x00"),
},
prefix: []indexValue{
{
source: (*structs.EnterpriseMeta)(nil),
expected: nil,
},
{
source: structs.EnterpriseMeta{},
expected: nil,
},
{
source: Query{Value: "NoDeId"},
expected: []byte("nodeid\x00"),
},
},
},
indexNode: {
read: indexValue{
source: Query{
Value: "NoDeId",
},
expected: []byte("nodeid\x00"),
},
write: indexValue{
source: &structs.Coordinate{
Node: "NoDeId",
},
expected: []byte("nodeid\x00"),
},
},
}
}

View File

@ -41,19 +41,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// a per-node coordinate for a nonexistent node doesn't do anything bad.
ws := memdb.NewWatchSet()
idx, all, err := s.Coordinates(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(0), idx, "bad index")
require.Nil(t, all)
coordinateWs := memdb.NewWatchSet()
_, coords, err := s.Coordinate(coordinateWs, "nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
require.Equal(t, lib.CoordinateSet{}, coords)
// Make an update for nodes that don't exist and make sure they get
@ -68,40 +62,26 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
t.Fatalf("err: %s", err)
}
if watchFired(ws) || watchFired(coordinateWs) {
t.Fatalf("bad")
}
require.NoError(t, s.CoordinateBatchUpdate(1, updates))
require.False(t, watchFired(ws) || watchFired(coordinateWs))
// Should still be empty, though applying an empty batch does bump
// Should still be empty, though applying an empty batch does NOT bump
// the table index.
ws = memdb.NewWatchSet()
idx, all, err = s.Coordinates(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(0), idx, "bad index")
require.Nil(t, all)
coordinateWs = memdb.NewWatchSet()
idx, _, err = s.Coordinate(coordinateWs, "node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(0), idx, "bad index")
// Register the nodes then do the update again.
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.CoordinateBatchUpdate(3, updates))
if !watchFired(ws) || !watchFired(coordinateWs) {
t.Fatalf("bad")
}
@ -109,12 +89,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should go through now.
ws = memdb.NewWatchSet()
idx, all, err = s.Coordinates(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(3), idx, "bad index")
require.Equal(t, updates, all)
// Also verify the per-node coordinate interface.
@ -122,12 +98,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
for i, update := range updates {
nodeWs[i] = memdb.NewWatchSet()
idx, coords, err := s.Coordinate(nodeWs[i], update.Node, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(3), idx, "bad index")
expected := lib.CoordinateSet{
"": update.Coord,
}
@ -136,9 +108,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Update the coordinate for one of the nodes.
updates[1].Coord = generateRandomCoordinate()
if err := s.CoordinateBatchUpdate(4, updates); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.CoordinateBatchUpdate(4, updates))
if !watchFired(ws) {
t.Fatalf("bad")
}
@ -150,23 +120,15 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Verify it got applied.
idx, all, err = s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(4), idx, "bad index")
require.Equal(t, updates, all)
// And check the per-node coordinate version of the same thing.
for _, update := range updates {
idx, coords, err := s.Coordinate(nil, update.Node, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(4), idx, "bad index")
expected := lib.CoordinateSet{
"": update.Coord,
}
@ -180,19 +142,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
Coord: &coordinate.Coordinate{Height: math.NaN()},
},
}
if err := s.CoordinateBatchUpdate(5, badUpdates); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.CoordinateBatchUpdate(5, badUpdates))
// Verify we are at the previous state, though the empty batch does bump
// the table index.
// Verify we are at the previous state, and verify that the empty batch
// does NOT bump the table index.
idx, all, err = s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(4), idx, "bad index")
require.Equal(t, updates, all)
}
@ -213,15 +169,11 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.CoordinateBatchUpdate(2, updates))
// Make sure it's in there.
_, coords, err := s.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
expected := lib.CoordinateSet{
"alpha": updates[0].Coord,
"beta": updates[1].Coord,
@ -229,25 +181,17 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
require.Equal(t, expected, coords)
// Now delete the node.
if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.DeleteNode(3, "node1", nil))
// Make sure the coordinate is gone.
_, coords, err = s.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
require.Equal(t, lib.CoordinateSet{}, coords)
// Make sure the index got updated.
idx, all, err := s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(3), idx, "bad index")
require.Nil(t, all)
}
@ -267,9 +211,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.CoordinateBatchUpdate(3, updates))
// Manually put a bad coordinate in for node3.
testRegisterNode(t, s, 4, "node3")
@ -278,9 +220,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
Coord: &coordinate.Coordinate{Height: math.NaN()},
}
tx := s.db.WriteTxn(5)
if err := tx.Insert("coordinates", badUpdate); err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(t, tx.Insert("coordinates", badUpdate))
require.NoError(t, tx.Commit())
// Snapshot the coordinates.
@ -298,18 +238,13 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(5, trash); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, s.CoordinateBatchUpdate(5, trash))
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
require.Equal(t, uint64(4), snap.LastIndex(), "bad index")
iter, err := snap.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
var dump structs.Coordinates
for coord := iter.Next(); coord != nil; coord = iter.Next() {
dump = append(dump, coord.(*structs.Coordinate))
@ -319,30 +254,20 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
// the read side.
require.Equal(t, append(updates, badUpdate), dump)
// Restore the values into a new state store.
func() {
runStep(t, "restore the values into a new state store", func(t *testing.T) {
s := testStateStore(t)
restore := s.Restore()
if err := restore.Coordinates(6, dump); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, restore.Coordinates(6, dump))
restore.Commit()
// Read the restored coordinates back out and verify that they match.
idx, res, err := s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
require.NoError(t, err)
require.Equal(t, uint64(6), idx, "bad index")
require.Equal(t, updates, res)
// Check that the index was updated (note that it got passed
// in during the restore).
if idx := s.maxIndex("coordinates"); idx != 6 {
t.Fatalf("bad index: %d", idx)
}
}()
require.Equal(t, uint64(6), s.maxIndex("coordinates"), "bad index")
})
}

View File

@ -2,6 +2,7 @@ package state
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
)
@ -113,6 +114,15 @@ func (b *indexBuilder) String(v string) {
(*bytes.Buffer)(b).WriteString(null)
}
func (b *indexBuilder) Int64(v int64) {
const size = binary.MaxVarintLen64
// Get the value and encode it
buf := make([]byte, size)
binary.PutVarint(buf, v)
b.Raw(buf)
}
// Raw appends the bytes without a null terminator to the buffer. Raw should
// only be used when v has a fixed length, or when building the last segment of
// a prefix index.

View File

@ -40,6 +40,17 @@ func indexFromQuery(arg interface{}) ([]byte, error) {
return b.Bytes(), nil
}
func indexFromServiceNameAsString(arg interface{}) ([]byte, error) {
sn, ok := arg.(structs.ServiceName)
if !ok {
return nil, fmt.Errorf("unexpected type %T for ServiceName index", arg)
}
var b indexBuilder
b.String(strings.ToLower(sn.String()))
return b.Bytes(), nil
}
// uuidStringToBytes is a modified version of memdb.UUIDFieldIndex.parseString
func uuidStringToBytes(uuid string) ([]byte, error) {
l := len(uuid)

View File

@ -23,3 +23,22 @@ func prefixIndexFromQuery(arg interface{}) ([]byte, error) {
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
}
func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) {
return prefixIndexFromQuery(arg)
}
func prefixIndexFromServiceNameAsString(arg interface{}) ([]byte, error) {
var b indexBuilder
switch v := arg.(type) {
case *structs.EnterpriseMeta:
return nil, nil
case structs.EnterpriseMeta:
return nil, nil
case structs.ServiceName:
b.String(strings.ToLower(v.String()))
return b.Bytes(), nil
}
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
}

View File

@ -36,14 +36,18 @@ func TestNewDBSchema_Indexers(t *testing.T) {
require.NoError(t, schema.Validate())
var testcases = map[string]func() map[string]indexerTestCase{
// acl
tableACLPolicies: testIndexerTableACLPolicies,
tableACLRoles: testIndexerTableACLRoles,
// catalog
tableChecks: testIndexerTableChecks,
tableServices: testIndexerTableServices,
tableNodes: testIndexerTableNodes,
tableConfigEntries: testIndexerTableConfigEntries,
tableCoordinates: testIndexerTableCoordinates,
tableMeshTopology: testIndexerTableMeshTopology,
tableGatewayServices: testIndexerTableGatewayServices,
// config
tableConfigEntries: testIndexerTableConfigEntries,
}
addEnterpriseIndexerTestCases(testcases)

View File

@ -57,18 +57,37 @@ func testStateStore(t *testing.T) *Store {
}
func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
testRegisterNodeOpts(t, s, idx, nodeID)
}
// testRegisterNodeWithChange registers a node and ensures it gets different from previous registration
func testRegisterNodeWithChange(t *testing.T, s *Store, idx uint64, nodeID string) {
testRegisterNodeWithMeta(t, s, idx, nodeID, map[string]string{
testRegisterNodeOpts(t, s, idx, nodeID, regNodeWithMeta(map[string]string{
"version": fmt.Sprint(idx),
})
}))
}
func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) {
node := &structs.Node{Node: nodeID, Meta: meta}
testRegisterNodeOpts(t, s, idx, nodeID, regNodeWithMeta(meta))
}
type regNodeOption func(*structs.Node) error
func regNodeWithMeta(meta map[string]string) func(*structs.Node) error {
return func(node *structs.Node) error {
node.Meta = meta
return nil
}
}
func testRegisterNodeOpts(t *testing.T, s *Store, idx uint64, nodeID string, opts ...regNodeOption) {
node := &structs.Node{Node: nodeID}
for _, opt := range opts {
if err := opt(node); err != nil {
t.Fatalf("err: %s", err)
}
}
if err := s.EnsureNode(idx, node); err != nil {
t.Fatalf("err: %s", err)
}
@ -283,7 +302,7 @@ func TestStateStore_maxIndex(t *testing.T) {
testRegisterNode(t, s, 1, "bar")
testRegisterService(t, s, 2, "foo", "consul")
if max := s.maxIndex("nodes", tableServices); max != 2 {
if max := s.maxIndex(tableNodes, tableServices); max != 2 {
t.Fatalf("bad max: %d", max)
}
}
@ -295,12 +314,12 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) {
testRegisterNode(t, s, 1, "bar")
tx := s.db.WriteTxnRestore()
if err := indexUpdateMaxTxn(tx, 3, "nodes"); err != nil {
if err := indexUpdateMaxTxn(tx, 3, tableNodes); err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, tx.Commit())
if max := s.maxIndex("nodes"); max != 3 {
if max := s.maxIndex(tableNodes); max != 3 {
t.Fatalf("bad max: %d", max)
}
}

View File

@ -149,11 +149,13 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs
var entry *structs.Node
var err error
// TODO(partitions): change these errors to include node partitions when printing
getNode := func() (*structs.Node, error) {
if op.Node.ID != "" {
return getNodeIDTxn(tx, op.Node.ID)
} else {
return getNodeTxn(tx, op.Node.Node)
return getNodeTxn(tx, op.Node.Node, op.Node.GetEnterpriseMeta())
}
}
@ -180,11 +182,11 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs
entry, err = getNode()
case api.NodeDelete:
err = s.deleteNodeTxn(tx, idx, op.Node.Node)
err = s.deleteNodeTxn(tx, idx, op.Node.Node, op.Node.GetEnterpriseMeta())
case api.NodeDeleteCAS:
var ok bool
ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node)
ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node, op.Node.GetEnterpriseMeta())
if !ok && err == nil {
err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node)
}

View File

@ -321,6 +321,7 @@ func TestStateStore_Txn_Service(t *testing.T) {
expectedServices := &structs.NodeServices{
Node: &structs.Node{
Node: "node1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,

View File

@ -115,6 +115,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: "node1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Datacenter: "dc1",
Address: "3.4.5.6",
RaftIndex: raftIndex(ids, "reg2", "reg2"),
@ -145,6 +146,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Datacenter: "dc1",
Address: "1.2.3.4",
RaftIndex: raftIndex(ids, "reg3", "reg3"),
@ -194,6 +196,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Datacenter: "dc1",
Address: "1.2.3.4",
RaftIndex: raftIndex(ids, "reg3", "reg3"),
@ -465,6 +468,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: "node1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Datacenter: "dc2",
Address: "3.4.5.6",
RaftIndex: raftIndex(ids, "reg2", "reg2"),
@ -495,6 +499,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Datacenter: "dc2",
Address: "1.2.3.4",
RaftIndex: raftIndex(ids, "reg3", "reg3"),
@ -544,6 +549,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Datacenter: "dc2",
Address: "1.2.3.4",
RaftIndex: raftIndex(ids, "reg3", "reg3"),

View File

@ -437,7 +437,7 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool {
// Check if any of the node-level fields are being changed.
if r.ID != node.ID ||
r.Node != node.Node ||
// TODO(partitions): do we need to check partition here?
r.PartitionOrDefault() != node.PartitionOrDefault() ||
r.Address != node.Address ||
r.Datacenter != node.Datacenter ||
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) ||

View File

@ -51,6 +51,7 @@ func TestAPI_CatalogNodes(t *testing.T) {
{
ID: s.Config.NodeID,
Node: s.Config.NodeName,
Partition: defaultPartition,
Address: "127.0.0.1",
Datacenter: "dc1",
TaggedAddresses: map[string]string{

View File

@ -170,6 +170,7 @@ func TestAPI_ClientTxn(t *testing.T) {
Node: &Node{
ID: nodeID,
Node: "foo",
Partition: defaultPartition,
Address: "2.2.2.2",
Datacenter: "dc1",
CreateIndex: ret.Results[2].Node.CreateIndex,
@ -269,6 +270,7 @@ func TestAPI_ClientTxn(t *testing.T) {
Node: &Node{
ID: s.Config.NodeID,
Node: s.Config.NodeName,
Partition: defaultPartition,
Address: "127.0.0.1",
Datacenter: "dc1",
TaggedAddresses: map[string]string{
@ -283,7 +285,7 @@ func TestAPI_ClientTxn(t *testing.T) {
},
},
}
require.Equal(r, ret.Results, expected)
require.Equal(r, expected, ret.Results)
})
// Sanity check using the regular GET API.