From 310e775a8a6f3734a1e0626bf496f2fdf7fb92ff Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Tue, 17 Aug 2021 13:29:39 -0500 Subject: [PATCH] state: partition nodes and coordinates in the state store (#10859) Additionally: - partitioned the catalog indexes appropriately for partitioning - removed a stray reference to a non-existent index named "node.checks" --- agent/consul/fsm/commands_oss_test.go | 22 +- agent/consul/fsm/snapshot_oss.go | 9 +- agent/consul/internal_endpoint_test.go | 4 + agent/consul/state/catalog.go | 342 +++++++++++++++------- agent/consul/state/catalog_events_test.go | 34 ++- agent/consul/state/catalog_oss.go | 44 ++- agent/consul/state/catalog_oss_test.go | 18 ++ agent/consul/state/catalog_schema.go | 7 +- agent/consul/state/catalog_test.go | 54 +++- agent/consul/state/coordinate.go | 155 +++++++--- agent/consul/state/coordinate_oss.go | 36 +++ agent/consul/state/coordinate_oss_test.go | 54 ++++ agent/consul/state/coordinate_test.go | 161 +++------- agent/consul/state/indexer.go | 10 + agent/consul/state/query.go | 11 + agent/consul/state/query_oss.go | 19 ++ agent/consul/state/schema_test.go | 10 +- agent/consul/state/state_store_test.go | 33 ++- agent/consul/state/txn.go | 8 +- agent/consul/state/txn_test.go | 3 +- agent/rpc/subscribe/subscribe_test.go | 6 + agent/structs/structs.go | 2 +- api/catalog_test.go | 1 + api/txn_test.go | 4 +- 24 files changed, 723 insertions(+), 324 deletions(-) create mode 100644 agent/consul/state/coordinate_oss.go create mode 100644 agent/consul/state/coordinate_oss_test.go diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 50194d4419..31f20bf4ec 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -53,9 +53,10 @@ func TestFSM_RegisterNode(t *testing.T) { } req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(), } buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { @@ -114,6 +115,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { Status: api.HealthPassing, ServiceID: "db", }, + EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(), } buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { @@ -712,12 +714,14 @@ func TestFSM_CoordinateUpdate(t *testing.T) { // Write a batch of two coordinates. updates := structs.Coordinates{ &structs.Coordinate{ - Node: "node1", - Coord: generateRandomCoordinate(), + Node: "node1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), + Coord: generateRandomCoordinate(), }, &structs.Coordinate{ - Node: "node2", - Coord: generateRandomCoordinate(), + Node: "node2", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), + Coord: generateRandomCoordinate(), }, } buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates) @@ -734,9 +738,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if !reflect.DeepEqual(coords, updates) { - t.Fatalf("bad: %#v", coords) - } + require.Equal(t, updates, coords) } func TestFSM_SessionCreate_Destroy(t *testing.T) { diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 5728e9c76b..57b3e81d8f 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -96,6 +96,8 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink, // Register each node for node := nodes.Next(); node != nil; node = nodes.Next() { n := node.(*structs.Node) + nodeEntMeta := n.GetEnterpriseMeta() + req := structs.RegisterRequest{ ID: n.ID, Node: n.Node, @@ -104,6 +106,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink, TaggedAddresses: n.TaggedAddresses, NodeMeta: n.Meta, RaftIndex: n.RaftIndex, + EnterpriseMeta: *nodeEntMeta, } // Register the node itself @@ -115,8 +118,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink, } // Register each service this node has - // TODO(partitions) - services, err := s.state.Services(n.Node, nil) + services, err := s.state.Services(n.Node, nodeEntMeta) if err != nil { return err } @@ -132,8 +134,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink, // Register each check this node has req.Service = nil - // TODO(partitions) - checks, err := s.state.Checks(n.Node, nil) + checks, err := s.state.Checks(n.Node, nodeEntMeta) if err != nil { return err } diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index a4d64d2565..5758aa89f7 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -876,6 +876,7 @@ func TestInternal_GatewayServiceDump_Terminating(t *testing.T) { { Node: &structs.Node{ Node: "baz", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Address: "127.0.0.3", Datacenter: "dc1", }, @@ -908,6 +909,7 @@ func TestInternal_GatewayServiceDump_Terminating(t *testing.T) { { Node: &structs.Node{ Node: "bar", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Address: "127.0.0.2", Datacenter: "dc1", }, @@ -1215,6 +1217,7 @@ func TestInternal_GatewayServiceDump_Ingress(t *testing.T) { { Node: &structs.Node{ Node: "bar", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Address: "127.0.0.2", Datacenter: "dc1", }, @@ -1250,6 +1253,7 @@ func TestInternal_GatewayServiceDump_Ingress(t *testing.T) { { Node: &structs.Node{ Node: "baz", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Address: "127.0.0.3", Datacenter: "dc1", }, diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index c9b7e97d08..c26d10f603 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -49,16 +49,28 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) { // Services is used to pull the full list of services for a given node for use // during snapshots. -func (s *Snapshot) Services(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - // TODO(partitions): use the provided entmeta - return s.tx.Get(tableServices, indexNode, Query{Value: node}) +func (s *Snapshot) Services(node string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + return s.tx.Get(tableServices, indexNode, Query{ + Value: node, + EnterpriseMeta: *entMeta, + }) } // Checks is used to pull the full list of checks for a given node for use // during snapshots. -func (s *Snapshot) Checks(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - // TODO(partitions): use the provided entmeta - return s.tx.Get(tableChecks, indexNode, Query{Value: node}) +func (s *Snapshot) Checks(node string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + return s.tx.Get(tableChecks, indexNode, Query{ + Value: node, + EnterpriseMeta: *entMeta, + }) } // Registration is used to make sure a node, service, and check registration is @@ -83,6 +95,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err } func (s *Store) ensureCheckIfNodeMatches(tx WriteTxn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error { + // TODO(partitions): do we have to check partition here? probably not if check.Node != node { return fmt.Errorf("check node %q does not match node %q", check.Node, node) @@ -107,6 +120,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b Node: req.Node, Address: req.Address, Datacenter: req.Datacenter, + Partition: req.PartitionOrDefault(), TaggedAddresses: req.TaggedAddresses, Meta: req.NodeMeta, } @@ -121,7 +135,10 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b // modify the node at all so we prevent watch churn and useless writes // and modify index bumps on the node. { - existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node}) + existing, err := tx.First(tableNodes, indexID, Query{ + Value: node.Node, + EnterpriseMeta: *node.GetEnterpriseMeta(), + }) if err != nil { return fmt.Errorf("node lookup failed: %s", err) } @@ -136,7 +153,11 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b // node info above to make sure we actually need to update the service // definition in order to prevent useless churn if nothing has changed. if req.Service != nil { - existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID}) + existing, err := tx.First(tableServices, indexID, NodeServiceQuery{ + EnterpriseMeta: req.Service.EnterpriseMeta, + Node: req.Node, + Service: req.Service.ID, + }) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -180,7 +201,8 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error { // If allowClashWithoutID then, getting a conflict on another node without ID will be allowed func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error { // Retrieve all of the nodes - enodes, err := tx.Get(tableNodes, indexID) + + enodes, err := tx.Get(tableNodes, indexID+"_prefix", node.GetEnterpriseMeta()) if err != nil { return fmt.Errorf("Cannot lookup all nodes: %s", err) } @@ -189,7 +211,11 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID { // Look up the existing node's Serf health check to see if it's failed. // If it is, the node can be renamed. - enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) + enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckQuery{ + EnterpriseMeta: *node.GetEnterpriseMeta(), + Node: enode.Node, + CheckID: string(structs.SerfCheckID), + }) if err != nil { return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err) } @@ -216,7 +242,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi // Returns a bool indicating if a write happened and any error. func (s *Store) ensureNodeCASTxn(tx WriteTxn, idx uint64, node *structs.Node) (bool, error) { // Retrieve the existing entry. - existing, err := getNodeTxn(tx, node.Node) + existing, err := getNodeTxn(tx, node.Node, node.GetEnterpriseMeta()) if err != nil { return false, err } @@ -249,6 +275,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod // name is the same. var n *structs.Node if node.ID != "" { + // TODO(partitions): should this take a node ent-meta? existing, err := getNodeIDTxn(tx, node.ID) if err != nil { return fmt.Errorf("node lookup failed: %s", err) @@ -262,7 +289,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod return fmt.Errorf("Error while renaming Node ID: %q (%s): %s", node.ID, node.Address, dupNameError) } // We are actually renaming a node, remove its reference first - err := s.deleteNodeTxn(tx, idx, n.Node) + err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta()) if err != nil { return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s", node.ID, node.Address, n.Node, node.Node) @@ -282,7 +309,10 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod // Check for an existing node by name to support nodes with no IDs. if n == nil { - existing, err := tx.First(tableNodes, indexID, Query{Value: node.Node}) + existing, err := tx.First(tableNodes, indexID, Query{ + Value: node.Node, + EnterpriseMeta: *node.GetEnterpriseMeta(), + }) if err != nil { return fmt.Errorf("node name lookup failed: %s", err) } @@ -314,41 +344,35 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod } // Insert the node and update the index. - if err := tx.Insert("nodes", node); err != nil { - return fmt.Errorf("failed inserting node: %s", err) - } - if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - // Update the node's service indexes as the node information is included - // in health queries and we would otherwise miss node updates in some cases - // for those queries. - if err := updateAllServiceIndexesOfNode(tx, idx, node.Node); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - return nil + return catalogInsertNode(tx, node) } // GetNode is used to retrieve a node registration by node name ID. -func (s *Store) GetNode(nodeNameOrID string, _ *structs.EnterpriseMeta) (uint64, *structs.Node, error) { - // TODO(partitions): use the provided entmeta +func (s *Store) GetNode(nodeNameOrID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.Node, error) { tx := s.db.Txn(false) defer tx.Abort() + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Get the table index. - idx := maxIndexTxn(tx, "nodes") + idx := catalogNodesMaxIndex(tx, entMeta) // Retrieve the node from the state store - node, err := getNodeTxn(tx, nodeNameOrID) + node, err := getNodeTxn(tx, nodeNameOrID, entMeta) if err != nil { return 0, nil, fmt.Errorf("node lookup failed: %s", err) } return idx, node, nil } -func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) { - node, err := tx.First(tableNodes, indexID, Query{Value: nodeName}) +func getNodeTxn(tx ReadTxn, nodeNameOrID string, entMeta *structs.EnterpriseMeta) (*structs.Node, error) { + node, err := tx.First(tableNodes, indexID, Query{ + Value: nodeNameOrID, + EnterpriseMeta: *entMeta, + }) if err != nil { return nil, fmt.Errorf("node lookup failed: %s", err) } @@ -365,7 +389,7 @@ func getNodeIDTxn(tx ReadTxn, id types.NodeID) (*structs.Node, error) { return nil, fmt.Errorf("node lookup by ID failed, wrong UUID: %v for '%s'", err, strnode) } - node, err := tx.First("nodes", "uuid", uuidValue) + node, err := tx.First(tableNodes, "uuid", uuidValue) if err != nil { return nil, fmt.Errorf("node lookup by ID failed: %s", err) } @@ -381,7 +405,9 @@ func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) { defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, "nodes") + /// + // NOTE: nodeIDs aren't partitioned so don't use the convenience function. + idx := maxIndexTxn(tx, tableNodes) // Retrieve the node from the state store node, err := getNodeIDTxn(tx, id) @@ -389,16 +415,20 @@ func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *Store) Nodes(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) { - // TODO(partitions): use the provided entmeta +func (s *Store) Nodes(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Get the table index. - idx := maxIndexTxn(tx, "nodes") + idx := catalogNodesMaxIndex(tx, entMeta) // Retrieve all of the nodes - nodes, err := tx.Get(tableNodes, indexID) + nodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -413,13 +443,20 @@ func (s *Store) Nodes(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, str } // NodesByMeta is used to return all nodes with the given metadata key/value pairs. -func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) { - // TODO(partitions): use the provided entmeta +func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, entMeta *structs.EnterpriseMeta) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Get the table index. - idx := maxIndexTxn(tx, "nodes") + idx := maxIndexTxn(tx, tableNodes) + // TODO:(partitions) use the partitioned meta index + // idx := catalogNodesMaxIndex(tx, entMeta) + _ = entMeta // Retrieve all of the nodes var args []interface{} @@ -427,7 +464,8 @@ func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *str args = append(args, key, value) break } - nodes, err := tx.Get("nodes", "meta", args...) + + nodes, err := tx.Get(tableNodes, "meta", args...) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -445,13 +483,20 @@ func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *str } // DeleteNode is used to delete a given node by its ID. -func (s *Store) DeleteNode(idx uint64, nodeName string, _ *structs.EnterpriseMeta) error { - // TODO(partitions): use the provided entmeta +func (s *Store) DeleteNode(idx uint64, nodeName string, entMeta *structs.EnterpriseMeta) error { tx := s.db.WriteTxn(idx) defer tx.Abort() + // TODO(partition): double check all freshly modified state store functions + // that take an ent meta do this trick + + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Call the node deletion. - if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil { + if err := s.deleteNodeTxn(tx, idx, nodeName, entMeta); err != nil { return err } @@ -461,9 +506,9 @@ func (s *Store) DeleteNode(idx uint64, nodeName string, _ *structs.EnterpriseMet // deleteNodeCASTxn is used to try doing a node delete operation with a given // raft index. If the CAS index specified is not equal to the last observed index for // the given check, then the call is a noop, otherwise a normal check delete is invoked. -func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) (bool, error) { +func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string, entMeta *structs.EnterpriseMeta) (bool, error) { // Look up the node. - node, err := getNodeTxn(tx, nodeName) + node, err := getNodeTxn(tx, nodeName, entMeta) if err != nil { return false, err } @@ -479,7 +524,7 @@ func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) } // Call the actual deletion if the above passed. - if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil { + if err := s.deleteNodeTxn(tx, idx, nodeName, entMeta); err != nil { return false, err } @@ -488,9 +533,17 @@ func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) // deleteNodeTxn is the inner method used for removing a node from // the store within a given transaction. -func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { +func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta *structs.EnterpriseMeta) error { + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() + } + // Look up the node. - node, err := tx.First(tableNodes, indexID, Query{Value: nodeName}) + node, err := tx.First(tableNodes, indexID, Query{ + Value: nodeName, + EnterpriseMeta: *entMeta, + }) if err != nil { return fmt.Errorf("node lookup failed: %s", err) } @@ -499,7 +552,10 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { } // Delete all services associated with the node and update the service index. - services, err := tx.Get(tableServices, indexNode, Query{Value: nodeName}) + services, err := tx.Get(tableServices, indexNode, Query{ + Value: nodeName, + EnterpriseMeta: *entMeta, + }) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -525,7 +581,10 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { // Delete all checks associated with the node. This will invalidate // sessions as necessary. - checks, err := tx.Get(tableChecks, indexNode, Query{Value: nodeName}) + checks, err := tx.Get(tableChecks, indexNode, Query{ + Value: nodeName, + EnterpriseMeta: *entMeta, + }) if err != nil { return fmt.Errorf("failed check lookup: %s", err) } @@ -542,28 +601,28 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { } // Delete any coordinates associated with this node. - coords, err := tx.Get("coordinates", "node", nodeName) + coords, err := tx.Get(tableCoordinates, indexNode, Query{ + Value: nodeName, + EnterpriseMeta: *entMeta, + }) if err != nil { return fmt.Errorf("failed coordinate lookup: %s", err) } - var coordsToDelete []interface{} + var coordsToDelete []*structs.Coordinate for coord := coords.Next(); coord != nil; coord = coords.Next() { - coordsToDelete = append(coordsToDelete, coord) + coordsToDelete = append(coordsToDelete, coord.(*structs.Coordinate)) } for _, coord := range coordsToDelete { - if err := tx.Delete("coordinates", coord); err != nil { + if err := deleteCoordinateTxn(tx, idx, coord); err != nil { return fmt.Errorf("failed deleting coordinate: %s", err) } - if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } } // Delete the node and update the index. - if err := tx.Delete("nodes", node); err != nil { + if err := tx.Delete(tableNodes, node); err != nil { return fmt.Errorf("failed deleting node: %s", err) } - if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil { + if err := catalogUpdateNodesIndexes(tx, idx, entMeta); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -626,7 +685,11 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node // existing memdb transaction. func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { // Check for existing service - existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) + existing, err := tx.First(tableServices, indexID, NodeServiceQuery{ + EnterpriseMeta: svc.EnterpriseMeta, + Node: node, + Service: svc.ID, + }) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -651,7 +714,10 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool // That's always populated when we read from the state store. entry := svc.ToServiceNode(node) // Get the node - n, err := tx.First(tableNodes, indexID, Query{Value: node}) + n, err := tx.First(tableNodes, indexID, Query{ + Value: node, + EnterpriseMeta: svc.EnterpriseMeta, + }) if err != nil { return fmt.Errorf("failed node lookup: %s", err) } @@ -765,7 +831,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, // Get the table index. idx := catalogServicesMaxIndex(tx, entMeta) - if nodeIdx := maxIndexTxn(tx, "nodes"); nodeIdx > idx { + if nodeIdx := catalogNodesMaxIndex(tx, entMeta); nodeIdx > idx { idx = nodeIdx } @@ -775,7 +841,8 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, args = append(args, key, value) break } - nodes, err := tx.Get("nodes", "meta", args...) + // TODO(partitions): scope the meta index to a partition + nodes, err := tx.Get(tableNodes, "meta", args...) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -971,7 +1038,7 @@ func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, index string, q Query) (uint } // Fill in the node details. - results, err = parseServiceNodes(tx, ws, results) + results, err = parseServiceNodes(tx, ws, results, &q.EnterpriseMeta) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } @@ -1017,7 +1084,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string } // Fill in the node details. - results, err = parseServiceNodes(tx, ws, results) + results, err = parseServiceNodes(tx, ws, results, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } @@ -1087,7 +1154,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta * } // Fill in the node details. - results, err = parseServiceNodes(tx, ws, results) + results, err = parseServiceNodes(tx, ws, results, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } @@ -1096,10 +1163,10 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta * // parseServiceNodes iterates over a services query and fills in the node details, // returning a ServiceNodes slice. -func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { +func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes, entMeta *structs.EnterpriseMeta) (structs.ServiceNodes, error) { // We don't want to track an unlimited number of nodes, so we pull a // top-level watch to use as a fallback. - allNodes, err := tx.Get(tableNodes, indexID) + allNodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta) if err != nil { return nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -1114,7 +1181,10 @@ func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNo s := sn.PartialClone() // Grab the corresponding node record. - watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node}) + watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{ + Value: sn.Node, + EnterpriseMeta: sn.EnterpriseMeta, + }) if err != nil { return nil, fmt.Errorf("failed node lookup: %s", err) } @@ -1128,6 +1198,7 @@ func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNo s.Address = node.Address s.Datacenter = node.Datacenter s.TaggedAddresses = node.TaggedAddresses + s.EnterpriseMeta.Merge(node.GetEnterpriseMeta()) s.NodeMeta = node.Meta results = append(results, s) @@ -1160,7 +1231,11 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs. } // Query the service - service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{ + EnterpriseMeta: *entMeta, + Node: nodeName, + Service: serviceID, + }) if err != nil { return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) } @@ -1176,11 +1251,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st tx := s.db.Txn(false) defer tx.Abort() + // TODO: accept non-pointer value for entMeta + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() + } + // Get the table index. idx := catalogMaxIndex(tx, entMeta, false) // Query the node by node name - watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID}) + watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: nodeNameOrID, EnterpriseMeta: *entMeta}) if err != nil { return true, 0, nil, nil, fmt.Errorf("node lookup failed: %s", err) } @@ -1194,7 +1274,7 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *st } // Attempt to lookup the node by its node ID - iter, err := tx.Get("nodes", "uuid_prefix", resizeNodeLookupKey(nodeNameOrID)) + iter, err := tx.Get(tableNodes, "uuid_prefix", resizeNodeLookupKey(nodeNameOrID)) if err != nil { ws.Add(watchCh) // TODO(sean@): We could/should log an error re: the uuid_prefix lookup @@ -1347,7 +1427,11 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st } // Delete any checks associated with the service. This will invalidate // sessions as necessary. - nsq := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta} + nsq := NodeServiceQuery{ + Node: nodeName, + Service: serviceID, + EnterpriseMeta: *entMeta, + } checks, err := tx.Get(tableChecks, indexNodeService, nsq) if err != nil { return fmt.Errorf("failed service check lookup: %s", err) @@ -1432,8 +1516,11 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { } // updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node -func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error { - services, err := tx.Get(tableServices, indexNode, Query{Value: nodeID}) +func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string, entMeta *structs.EnterpriseMeta) error { + services, err := tx.Get(tableServices, indexNode, Query{ + Value: nodeID, + EnterpriseMeta: *entMeta.WildcardEnterpriseMetaForPartition(), + }) if err != nil { return fmt.Errorf("failed updating services for node %s: %s", nodeID, err) } @@ -1483,7 +1570,11 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe // checks with no matching node or service. func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check - existing, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) + existing, err := tx.First(tableChecks, indexID, NodeCheckQuery{ + EnterpriseMeta: hc.EnterpriseMeta, + Node: hc.Node, + CheckID: string(hc.CheckID), + }) if err != nil { return fmt.Errorf("failed health check lookup: %s", err) } @@ -1503,7 +1594,10 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc } // Get the node - node, err := tx.First(tableNodes, indexID, Query{Value: hc.Node}) + node, err := tx.First(tableNodes, indexID, Query{ + Value: hc.Node, + EnterpriseMeta: hc.EnterpriseMeta, + }) if err != nil { return fmt.Errorf("failed node lookup: %s", err) } @@ -1515,7 +1609,11 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc // If the check is associated with a service, check that we have // a registration for the service. if hc.ServiceID != "" { - service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{ + EnterpriseMeta: hc.EnterpriseMeta, + Node: hc.Node, + Service: hc.ServiceID, + }) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1543,7 +1641,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc } else { // Since the check has been modified, it impacts all services of node // Update the status for all the services associated with this node - err = updateAllServiceIndexesOfNode(tx, idx, hc.Node) + err = updateAllServiceIndexesOfNode(tx, idx, hc.Node, &hc.EnterpriseMeta) if err != nil { return err } @@ -1683,7 +1781,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, } ws.Add(iter.WatchCh()) - return parseChecksByNodeMeta(tx, ws, idx, iter, filters) + return parseChecksByNodeMeta(tx, ws, idx, iter, filters, entMeta) } // ChecksInState is used to query the state store for all checks @@ -1715,7 +1813,7 @@ func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters return 0, nil, err } - return parseChecksByNodeMeta(tx, ws, idx, iter, filters) + return parseChecksByNodeMeta(tx, ws, idx, iter, filters, entMeta) } func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) { @@ -1746,11 +1844,12 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru // parseChecksByNodeMeta is a helper function used to deduplicate some // repetitive code for returning health checks filtered by node metadata fields. func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet, - idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) { + idx uint64, iter memdb.ResultIterator, filters map[string]string, + entMeta *structs.EnterpriseMeta) (uint64, structs.HealthChecks, error) { // We don't want to track an unlimited number of nodes, so we pull a // top-level watch to use as a fallback. - allNodes, err := tx.Get(tableNodes, indexID) + allNodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -1760,7 +1859,10 @@ func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet, var results structs.HealthChecks for check := iter.Next(); check != nil; check = iter.Next() { healthCheck := check.(*structs.HealthCheck) - watchCh, node, err := tx.FirstWatch(tableNodes, indexID, Query{Value: healthCheck.Node}) + watchCh, node, err := tx.FirstWatch(tableNodes, indexID, Query{ + Value: healthCheck.Node, + EnterpriseMeta: healthCheck.EnterpriseMeta, + }) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } @@ -1871,7 +1973,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ return err } } else { - if err := updateAllServiceIndexesOfNode(tx, idx, existing.Node); err != nil { + if err := updateAllServiceIndexesOfNode(tx, idx, existing.Node, &existing.EnterpriseMeta); err != nil { return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err) } if err := catalogUpdateServicesIndexes(tx, idx, entMeta); err != nil { @@ -1999,7 +2101,10 @@ func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, con entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() } - q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + q := Query{ + Value: serviceName, + EnterpriseMeta: *entMeta, + } iter, err := tx.Get(tableServices, index, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) @@ -2114,7 +2219,7 @@ func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, con ws.Add(iter.WatchCh()) } - return parseCheckServiceNodes(tx, fallbackWS, idx, results, err) + return parseCheckServiceNodes(tx, fallbackWS, idx, results, entMeta, err) } // CheckServiceTagNodes is used to query all nodes and checks for a given @@ -2148,7 +2253,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags // Get the table index. idx := maxIndexForService(tx, serviceName, serviceExists, true, entMeta) - return parseCheckServiceNodes(tx, ws, idx, results, err) + return parseCheckServiceNodes(tx, ws, idx, results, entMeta, err) } // GatewayServices is used to query all services associated with a gateway @@ -2181,6 +2286,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru func parseCheckServiceNodes( tx ReadTxn, ws memdb.WatchSet, idx uint64, services structs.ServiceNodes, + entMeta *structs.EnterpriseMeta, err error) (uint64, structs.CheckServiceNodes, error) { if err != nil { return 0, nil, err @@ -2194,7 +2300,7 @@ func parseCheckServiceNodes( // We don't want to track an unlimited number of nodes, so we pull a // top-level watch to use as a fallback. - allNodes, err := tx.Get(tableNodes, indexID) + allNodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } @@ -2203,7 +2309,7 @@ func parseCheckServiceNodes( // We need a similar fallback for checks. Since services need the // status of node + service-specific checks, we pull in a top-level // watch over all checks. - allChecks, err := tx.Get(tableChecks, indexID) + allChecks, err := tx.Get(tableChecks, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed checks lookup: %s", err) } @@ -2212,7 +2318,10 @@ func parseCheckServiceNodes( results := make(structs.CheckServiceNodes, 0, len(services)) for _, sn := range services { // Retrieve the node. - watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{Value: sn.Node}) + watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{ + Value: sn.Node, + EnterpriseMeta: sn.EnterpriseMeta, + }) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } @@ -2226,7 +2335,11 @@ func parseCheckServiceNodes( // First add the node-level checks. These always apply to any // service on the node. var checks structs.HealthChecks - q := NodeServiceQuery{Node: sn.Node, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition()} + q := NodeServiceQuery{ + Node: sn.Node, + Service: "", // node checks have no service + EnterpriseMeta: *sn.EnterpriseMeta.WildcardEnterpriseMetaForPartition(), + } iter, err := tx.Get(tableChecks, indexNodeService, q) if err != nil { return 0, nil, err @@ -2237,7 +2350,11 @@ func parseCheckServiceNodes( } // Now add the service-specific checks. - q = NodeServiceQuery{Node: sn.Node, Service: sn.ServiceID, EnterpriseMeta: sn.EnterpriseMeta} + q = NodeServiceQuery{ + Node: sn.Node, + Service: sn.ServiceID, + EnterpriseMeta: sn.EnterpriseMeta, + } iter, err = tx.Get(tableChecks, indexNodeService, q) if err != nil { return 0, nil, err @@ -2264,11 +2381,18 @@ func (s *Store) NodeInfo(ws memdb.WatchSet, node string, entMeta *structs.Enterp tx := s.db.Txn(false) defer tx.Abort() + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Get the table index. idx := catalogMaxIndex(tx, entMeta, true) // Query the node by the passed node - nodes, err := tx.Get(tableNodes, indexID, Query{Value: node}) + nodes, err := tx.Get(tableNodes, indexID, Query{ + Value: node, + EnterpriseMeta: *entMeta, + }) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } @@ -2287,7 +2411,7 @@ func (s *Store) NodeDump(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui idx := catalogMaxIndex(tx, entMeta, true) // Fetch all of the registered nodes - nodes, err := tx.Get(tableNodes, indexID) + nodes, err := tx.Get(tableNodes, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } @@ -2321,7 +2445,7 @@ func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.Enterpris results = append(results, sn) } - return parseCheckServiceNodes(tx, nil, idx, results, err) + return parseCheckServiceNodes(tx, nil, idx, results, entMeta, err) } func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { @@ -2345,7 +2469,7 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, results = append(results, sn) } - return parseCheckServiceNodes(tx, nil, idx, results, err) + return parseCheckServiceNodes(tx, nil, idx, results, entMeta, err) } // parseNodes takes an iterator over a set of nodes and returns a struct @@ -2360,14 +2484,14 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, // We don't want to track an unlimited number of services, so we pull a // top-level watch to use as a fallback. - allServices, err := tx.Get(tableServices, indexID) + allServices, err := tx.Get(tableServices, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed services lookup: %s", err) } allServicesCh := allServices.WatchCh() // We need a similar fallback for checks. - allChecks, err := tx.Get(tableChecks, indexID) + allChecks, err := tx.Get(tableChecks, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed checks lookup: %s", err) } @@ -2381,6 +2505,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, dump := &structs.NodeInfo{ ID: node.ID, Node: node.Node, + Partition: node.Partition, Address: node.Address, TaggedAddresses: node.TaggedAddresses, Meta: node.Meta, @@ -2793,7 +2918,10 @@ func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind str maxIdx = lib.MaxUint64(maxIdx, mapping.ModifyIndex) // Look up nodes for gateway - q := Query{Value: mapping.Gateway.Name, EnterpriseMeta: mapping.Gateway.EnterpriseMeta} + q := Query{ + Value: mapping.Gateway.Name, + EnterpriseMeta: mapping.Gateway.EnterpriseMeta, + } gwServices, err := tx.Get(tableServices, indexService, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) @@ -3262,8 +3390,8 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS oldUpstreams := make(map[structs.ServiceName]bool) if e, ok := existing.(*structs.ServiceNode); ok { for _, u := range e.ServiceProxy.Upstreams { - upstreamMeta := structs.NewEnterpriseMetaInDefaultPartition(u.DestinationNamespace) - sn := structs.NewServiceName(u.DestinationName, &upstreamMeta) + upstreamMeta := e.NewEnterpriseMetaInPartition(u.DestinationNamespace) + sn := structs.NewServiceName(u.DestinationName, upstreamMeta) oldUpstreams[sn] = true } @@ -3278,8 +3406,8 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS } // TODO (freddy): Account for upstream datacenter - upstreamMeta := structs.NewEnterpriseMetaInDefaultPartition(u.DestinationNamespace) - upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta) + upstreamMeta := svc.NewEnterpriseMetaInPartition(u.DestinationNamespace) + upstream := structs.NewServiceName(u.DestinationName, upstreamMeta) obj, err := tx.First(tableMeshTopology, indexID, upstream, downstream) if err != nil { diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 0e267315ca..558f63e427 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -284,7 +284,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return nil }, Mutate: func(s *Store, tx *txn) error { - return s.deleteNodeTxn(tx, tx.Index, "node1") + return s.deleteNodeTxn(tx, tx.Index, "node1", nil) }, WantEvents: []stream.Event{ // Should publish deregistration events for all services @@ -1695,10 +1695,11 @@ type regOption func(req *structs.RegisterRequest) error func testNodeRegistration(t *testing.T, opts ...regOption) *structs.RegisterRequest { r := &structs.RegisterRequest{ - Datacenter: "dc1", - ID: "11111111-2222-3333-4444-555555555555", - Node: "node1", - Address: "10.10.10.10", + Datacenter: "dc1", + ID: "11111111-2222-3333-4444-555555555555", + Node: "node1", + Address: "10.10.10.10", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), Checks: structs.HealthChecks{ &structs.HealthCheck{ CheckID: "serf-health", @@ -1719,19 +1720,21 @@ func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *struc // note: don't pass opts or they might get applied twice! r := testNodeRegistration(t) r.Service = &structs.NodeService{ - ID: svc, - Service: svc, - Port: 8080, + ID: svc, + Service: svc, + Port: 8080, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), } r.Checks = append(r.Checks, &structs.HealthCheck{ - CheckID: types.CheckID("service:" + svc), - Name: "service:" + svc, - Node: "node1", - ServiceID: svc, - ServiceName: svc, - Type: "ttl", - Status: api.HealthPassing, + CheckID: types.CheckID("service:" + svc), + Name: "service:" + svc, + Node: "node1", + ServiceID: svc, + ServiceName: svc, + Type: "ttl", + Status: api.HealthPassing, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), }) for _, opt := range opts { err := opt(r) @@ -1753,6 +1756,7 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) strea csn := getPayloadCheckServiceNode(e.Payload) csn.Node.ID = "11111111-2222-3333-4444-555555555555" csn.Node.Address = "10.10.10.10" + csn.Node.Partition = structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty() for _, opt := range opts { if err := opt(&e); err != nil { diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 842e0f397a..e6700cc1a9 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -26,6 +26,15 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s } } +func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error { + // overall nodes index + if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { // overall services index if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil { @@ -60,6 +69,29 @@ func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.Ent return nil } +func catalogInsertNode(tx WriteTxn, node *structs.Node) error { + // ensure that the Partition is always clear within the state store in OSS + node.Partition = "" + + // Insert the node and update the index. + if err := tx.Insert(tableNodes, node); err != nil { + return fmt.Errorf("failed inserting node: %s", err) + } + + if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta()); err != nil { + return err + } + + // Update the node's service indexes as the node information is included + // in health queries and we would otherwise miss node updates in some cases + // for those queries. + if err := updateAllServiceIndexesOfNode(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta()); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error { // Insert the service and update the index if err := tx.Insert(tableServices, svc); err != nil { @@ -81,6 +113,10 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error { return nil } +func catalogNodesMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { + return maxIndexTxn(tx, tableNodes) +} + func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, tableServices) } @@ -107,16 +143,16 @@ func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (i func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { if checks { - return maxIndexTxn(tx, "nodes", tableServices, tableChecks) + return maxIndexTxn(tx, tableNodes, tableServices, tableChecks) } - return maxIndexTxn(tx, "nodes", tableServices) + return maxIndexTxn(tx, tableNodes, tableServices) } func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 { if checks { - return maxIndexWatchTxn(tx, ws, "nodes", tableServices, tableChecks) + return maxIndexWatchTxn(tx, ws, tableNodes, tableServices, tableChecks) } - return maxIndexWatchTxn(tx, ws, "nodes", tableServices) + return maxIndexWatchTxn(tx, ws, tableNodes, tableServices) } func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 90c265e4ae..c259d42aed 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -185,7 +185,25 @@ func testIndexerTableNodes() map[string]indexerTestCase { source: &structs.Node{Node: "NoDeId"}, expected: []byte("nodeid\x00"), }, + prefix: []indexValue{ + { + source: (*structs.EnterpriseMeta)(nil), + expected: nil, + }, + { + source: structs.EnterpriseMeta{}, + expected: nil, + }, + { + source: Query{Value: "NoDeId"}, + expected: []byte("nodeid\x00"), + }, + }, }, + // TODO: uuid + // TODO: meta + + // TODO(partitions): fix schema tests for tables that reference nodes too } } diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index a57c8cf7d2..a161689668 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -38,9 +38,10 @@ func nodesTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: indexerSingle{ - readIndex: indexFromQuery, - writeIndex: indexFromNode, + Indexer: indexerSingleWithPrefix{ + readIndex: indexFromQuery, + writeIndex: indexFromNode, + prefixIndex: prefixIndexFromQueryNoNamespace, }, }, "uuid": { diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 7f68fdb83b..d307ef56ba 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -171,6 +171,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { ID: nodeID, Node: "node1", Address: "1.2.3.4", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), TaggedAddresses: map[string]string{"hello": "world"}, Meta: map[string]string{"somekey": "somevalue"}, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 1}, @@ -1316,7 +1317,7 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Indexes were updated. - for _, tbl := range []string{"nodes", tableServices, tableChecks} { + for _, tbl := range []string{tableNodes, tableServices, tableChecks} { if idx := s.maxIndex(tbl); idx != 3 { t.Fatalf("bad index: %d (%s)", idx, tbl) } @@ -1327,7 +1328,7 @@ func TestStateStore_DeleteNode(t *testing.T) { if err := s.DeleteNode(4, "node1", nil); err != nil { t.Fatalf("err: %s", err) } - if idx := s.maxIndex("nodes"); idx != 3 { + if idx := s.maxIndex(tableNodes); idx != 3 { t.Fatalf("bad index: %d", idx) } } @@ -4236,7 +4237,8 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { // Check that our result matches what we expect. expect := structs.NodeDump{ &structs.NodeInfo{ - Node: "node1", + Node: "node1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Checks: structs.HealthChecks{ &structs.HealthCheck{ Node: "node1", @@ -4293,7 +4295,8 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { }, }, &structs.NodeInfo{ - Node: "node2", + Node: "node2", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Checks: structs.HealthChecks{ &structs.HealthCheck{ Node: "node2", @@ -7518,3 +7521,46 @@ func TestProtocolForIngressGateway(t *testing.T) { }) } } + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} + +func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) { + t.Helper() + + all := dumpMaxIndexes(t, tx) + + for _, index := range skip { + if _, ok := all[index]; ok { + delete(all, index) + } else { + t.Logf("index %q isn't even set; probably test assertion isn't relevant anymore", index) + } + } + + require.Equal(t, expect, all) + + // TODO + // for _, index := range indexes { + // require.Equal(t, expectIndex, maxIndexTxn(tx, index), + // "index %s has the wrong value", index) + // } +} + +func dumpMaxIndexes(t *testing.T, tx ReadTxn) map[string]uint64 { + out := make(map[string]uint64) + + iter, err := tx.Get(tableIndex, "id") + require.NoError(t, err) + + for entry := iter.Next(); entry != nil; entry = iter.Next() { + if idx, ok := entry.(*IndexEntry); ok { + out[idx.Key] = idx.Value + } + } + return out +} diff --git a/agent/consul/state/coordinate.go b/agent/consul/state/coordinate.go index 620f87c1d5..f294adb7b6 100644 --- a/agent/consul/state/coordinate.go +++ b/agent/consul/state/coordinate.go @@ -2,6 +2,7 @@ package state import ( "fmt" + "strings" "github.com/hashicorp/go-memdb" @@ -9,39 +10,88 @@ import ( "github.com/hashicorp/consul/lib" ) +const tableCoordinates = "coordinates" + +func indexFromCoordinate(raw interface{}) ([]byte, error) { + c, ok := raw.(*structs.Coordinate) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.Coordinate index", raw) + } + + if c.Node == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(c.Node)) + b.String(strings.ToLower(c.Segment)) + return b.Bytes(), nil +} + +func indexNodeFromCoordinate(raw interface{}) ([]byte, error) { + c, ok := raw.(*structs.Coordinate) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.Coordinate index", raw) + } + + if c.Node == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(c.Node)) + return b.Bytes(), nil +} + +func indexFromCoordinateQuery(raw interface{}) ([]byte, error) { + q, ok := raw.(CoordinateQuery) + if !ok { + return nil, fmt.Errorf("unexpected type %T for CoordinateQuery index", raw) + } + + if q.Node == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(q.Node)) + b.String(strings.ToLower(q.Segment)) + return b.Bytes(), nil +} + +type CoordinateQuery struct { + Node string + Segment string + Partition string +} + +func (c CoordinateQuery) PartitionOrDefault() string { + return structs.PartitionOrDefault(c.Partition) +} + // coordinatesTableSchema returns a new table schema used for storing // network coordinates. func coordinatesTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "coordinates", + Name: tableCoordinates, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, - Indexer: &memdb.CompoundIndex{ - // AllowMissing is required since we allow - // Segment to be an empty string. - AllowMissing: true, - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "Segment", - Lowercase: true, - }, - }, + Indexer: indexerSingleWithPrefix{ + readIndex: indexFromCoordinateQuery, + writeIndex: indexFromCoordinate, + prefixIndex: prefixIndexFromQueryNoNamespace, }, }, - "node": { - Name: "node", + indexNode: { + Name: indexNode, AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, + Indexer: indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexNodeFromCoordinate, }, }, }, @@ -50,7 +100,7 @@ func coordinatesTableSchema() *memdb.TableSchema { // Coordinates is used to pull all the coordinates from the snapshot. func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) { - iter, err := s.tx.Get("coordinates", "id") + iter, err := s.tx.Get(tableCoordinates, indexID) if err != nil { return nil, err } @@ -68,28 +118,31 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error { continue } - if err := s.tx.Insert("coordinates", update); err != nil { + if err := ensureCoordinateTxn(s.tx, idx, update); err != nil { return fmt.Errorf("failed restoring coordinate: %s", err) } } - if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - return nil } // Coordinate returns a map of coordinates for the given node, indexed by // network segment. -func (s *Store) Coordinate(ws memdb.WatchSet, node string, _ *structs.EnterpriseMeta) (uint64, lib.CoordinateSet, error) { - // TODO(partitions): use the provided entmeta +func (s *Store) Coordinate(ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (uint64, lib.CoordinateSet, error) { tx := s.db.Txn(false) defer tx.Abort() - tableIdx := maxIndexTxn(tx, "coordinates") + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } - iter, err := tx.Get("coordinates", "node", node) + tableIdx := coordinatesMaxIndex(tx, entMeta) + + iter, err := tx.Get(tableCoordinates, indexNode, Query{ + Value: node, + EnterpriseMeta: *entMeta, + }) if err != nil { return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err) } @@ -104,16 +157,20 @@ func (s *Store) Coordinate(ws memdb.WatchSet, node string, _ *structs.Enterprise } // Coordinates queries for all nodes with coordinates. -func (s *Store) Coordinates(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Coordinates, error) { - // TODO(partitions): use the provided entmeta +func (s *Store) Coordinates(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Coordinates, error) { tx := s.db.Txn(false) defer tx.Abort() + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + // Get the table index. - idx := maxIndexTxn(tx, "coordinates") + idx := coordinatesMaxIndex(tx, entMeta) // Pull all the coordinates. - iter, err := tx.Get("coordinates", "id") + iter, err := tx.Get(tableCoordinates, indexID+"_prefix", entMeta) if err != nil { return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err) } @@ -140,6 +197,8 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e continue } + entMeta := update.GetEnterpriseMeta() + // Since the cleanup of coordinates is tied to deletion of // nodes, we silently drop any updates for nodes that we don't // know about. This might be possible during normal operation @@ -148,7 +207,10 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e // don't carefully sequence this, and since it will fix itself // on the next coordinate update from that node, we don't return // an error or log anything. - node, err := tx.First(tableNodes, indexID, Query{Value: update.Node}) + node, err := tx.First(tableNodes, indexID, Query{ + Value: update.Node, + EnterpriseMeta: *entMeta, + }) if err != nil { return fmt.Errorf("failed node lookup: %s", err) } @@ -156,15 +218,22 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e continue } - if err := tx.Insert("coordinates", update); err != nil { + if err := ensureCoordinateTxn(tx, idx, update); err != nil { return fmt.Errorf("failed inserting coordinate: %s", err) } } - // Update the index. - if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - return tx.Commit() } + +func deleteCoordinateTxn(tx WriteTxn, idx uint64, coord *structs.Coordinate) error { + if err := tx.Delete(tableCoordinates, coord); err != nil { + return fmt.Errorf("failed deleting coordinate: %s", err) + } + + if err := updateCoordinatesIndexes(tx, idx, coord.GetEnterpriseMeta()); err != nil { + return fmt.Errorf("failed updating coordinate index: %s", err) + } + + return nil +} diff --git a/agent/consul/state/coordinate_oss.go b/agent/consul/state/coordinate_oss.go new file mode 100644 index 0000000000..9e76f0cf9e --- /dev/null +++ b/agent/consul/state/coordinate_oss.go @@ -0,0 +1,36 @@ +// +build !consulent + +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" +) + +func coordinatesMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { + return maxIndexTxn(tx, tableCoordinates) +} + +func updateCoordinatesIndexes(tx WriteTxn, idx uint64, entMeta *structs.EnterpriseMeta) error { + // Update the index. + if err := indexUpdateMaxTxn(tx, idx, tableCoordinates); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + return nil +} + +func ensureCoordinateTxn(tx WriteTxn, idx uint64, coord *structs.Coordinate) error { + // ensure that the Partition is always empty within the state store + coord.Partition = "" + + if err := tx.Insert(tableCoordinates, coord); err != nil { + return fmt.Errorf("failed inserting coordinate: %s", err) + } + + if err := updateCoordinatesIndexes(tx, idx, coord.GetEnterpriseMeta()); err != nil { + return fmt.Errorf("failed updating coordinate index: %s", err) + } + + return nil +} diff --git a/agent/consul/state/coordinate_oss_test.go b/agent/consul/state/coordinate_oss_test.go new file mode 100644 index 0000000000..d7a7d805d1 --- /dev/null +++ b/agent/consul/state/coordinate_oss_test.go @@ -0,0 +1,54 @@ +// +build !consulent + +package state + +import "github.com/hashicorp/consul/agent/structs" + +func testIndexerTableCoordinates() map[string]indexerTestCase { + return map[string]indexerTestCase{ + indexID: { + read: indexValue{ + source: CoordinateQuery{ + Node: "NoDeId", + Segment: "SeGmEnT", + }, + expected: []byte("nodeid\x00segment\x00"), + }, + write: indexValue{ + source: &structs.Coordinate{ + Node: "NoDeId", + Segment: "SeGmEnT", + }, + expected: []byte("nodeid\x00segment\x00"), + }, + prefix: []indexValue{ + { + source: (*structs.EnterpriseMeta)(nil), + expected: nil, + }, + { + source: structs.EnterpriseMeta{}, + expected: nil, + }, + { + source: Query{Value: "NoDeId"}, + expected: []byte("nodeid\x00"), + }, + }, + }, + indexNode: { + read: indexValue{ + source: Query{ + Value: "NoDeId", + }, + expected: []byte("nodeid\x00"), + }, + write: indexValue{ + source: &structs.Coordinate{ + Node: "NoDeId", + }, + expected: []byte("nodeid\x00"), + }, + }, + } +} diff --git a/agent/consul/state/coordinate_test.go b/agent/consul/state/coordinate_test.go index 6a9f446345..3a28d199b9 100644 --- a/agent/consul/state/coordinate_test.go +++ b/agent/consul/state/coordinate_test.go @@ -41,19 +41,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // a per-node coordinate for a nonexistent node doesn't do anything bad. ws := memdb.NewWatchSet() idx, all, err := s.Coordinates(ws, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 0 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(0), idx, "bad index") require.Nil(t, all) coordinateWs := memdb.NewWatchSet() _, coords, err := s.Coordinate(coordinateWs, "nope", nil) - if err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, err) require.Equal(t, lib.CoordinateSet{}, coords) // Make an update for nodes that don't exist and make sure they get @@ -68,40 +62,26 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { Coord: generateRandomCoordinate(), }, } - if err := s.CoordinateBatchUpdate(1, updates); err != nil { - t.Fatalf("err: %s", err) - } - if watchFired(ws) || watchFired(coordinateWs) { - t.Fatalf("bad") - } + require.NoError(t, s.CoordinateBatchUpdate(1, updates)) + require.False(t, watchFired(ws) || watchFired(coordinateWs)) - // Should still be empty, though applying an empty batch does bump + // Should still be empty, though applying an empty batch does NOT bump // the table index. ws = memdb.NewWatchSet() idx, all, err = s.Coordinates(ws, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 1 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(0), idx, "bad index") require.Nil(t, all) coordinateWs = memdb.NewWatchSet() idx, _, err = s.Coordinate(coordinateWs, "node1", nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 1 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(0), idx, "bad index") // Register the nodes then do the update again. testRegisterNode(t, s, 1, "node1") testRegisterNode(t, s, 2, "node2") - if err := s.CoordinateBatchUpdate(3, updates); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.CoordinateBatchUpdate(3, updates)) if !watchFired(ws) || !watchFired(coordinateWs) { t.Fatalf("bad") } @@ -109,12 +89,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Should go through now. ws = memdb.NewWatchSet() idx, all, err = s.Coordinates(ws, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(3), idx, "bad index") require.Equal(t, updates, all) // Also verify the per-node coordinate interface. @@ -122,12 +98,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { for i, update := range updates { nodeWs[i] = memdb.NewWatchSet() idx, coords, err := s.Coordinate(nodeWs[i], update.Node, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(3), idx, "bad index") expected := lib.CoordinateSet{ "": update.Coord, } @@ -136,9 +108,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Update the coordinate for one of the nodes. updates[1].Coord = generateRandomCoordinate() - if err := s.CoordinateBatchUpdate(4, updates); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.CoordinateBatchUpdate(4, updates)) if !watchFired(ws) { t.Fatalf("bad") } @@ -150,23 +120,15 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Verify it got applied. idx, all, err = s.Coordinates(nil, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(4), idx, "bad index") require.Equal(t, updates, all) // And check the per-node coordinate version of the same thing. for _, update := range updates { idx, coords, err := s.Coordinate(nil, update.Node, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(4), idx, "bad index") expected := lib.CoordinateSet{ "": update.Coord, } @@ -180,19 +142,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { Coord: &coordinate.Coordinate{Height: math.NaN()}, }, } - if err := s.CoordinateBatchUpdate(5, badUpdates); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.CoordinateBatchUpdate(5, badUpdates)) - // Verify we are at the previous state, though the empty batch does bump - // the table index. + // Verify we are at the previous state, and verify that the empty batch + // does NOT bump the table index. idx, all, err = s.Coordinates(nil, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 5 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(4), idx, "bad index") require.Equal(t, updates, all) } @@ -213,15 +169,11 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { Coord: generateRandomCoordinate(), }, } - if err := s.CoordinateBatchUpdate(2, updates); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.CoordinateBatchUpdate(2, updates)) // Make sure it's in there. _, coords, err := s.Coordinate(nil, "node1", nil) - if err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, err) expected := lib.CoordinateSet{ "alpha": updates[0].Coord, "beta": updates[1].Coord, @@ -229,25 +181,17 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { require.Equal(t, expected, coords) // Now delete the node. - if err := s.DeleteNode(3, "node1", nil); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.DeleteNode(3, "node1", nil)) // Make sure the coordinate is gone. _, coords, err = s.Coordinate(nil, "node1", nil) - if err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, err) require.Equal(t, lib.CoordinateSet{}, coords) // Make sure the index got updated. idx, all, err := s.Coordinates(nil, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(3), idx, "bad index") require.Nil(t, all) } @@ -267,9 +211,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { Coord: generateRandomCoordinate(), }, } - if err := s.CoordinateBatchUpdate(3, updates); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.CoordinateBatchUpdate(3, updates)) // Manually put a bad coordinate in for node3. testRegisterNode(t, s, 4, "node3") @@ -278,9 +220,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { Coord: &coordinate.Coordinate{Height: math.NaN()}, } tx := s.db.WriteTxn(5) - if err := tx.Insert("coordinates", badUpdate); err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(t, tx.Insert("coordinates", badUpdate)) require.NoError(t, tx.Commit()) // Snapshot the coordinates. @@ -298,18 +238,13 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { Coord: generateRandomCoordinate(), }, } - if err := s.CoordinateBatchUpdate(5, trash); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, s.CoordinateBatchUpdate(5, trash)) // Verify the snapshot. - if idx := snap.LastIndex(); idx != 4 { - t.Fatalf("bad index: %d", idx) - } + require.Equal(t, uint64(4), snap.LastIndex(), "bad index") iter, err := snap.Coordinates() - if err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, err) + var dump structs.Coordinates for coord := iter.Next(); coord != nil; coord = iter.Next() { dump = append(dump, coord.(*structs.Coordinate)) @@ -319,30 +254,20 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { // the read side. require.Equal(t, append(updates, badUpdate), dump) - // Restore the values into a new state store. - func() { + runStep(t, "restore the values into a new state store", func(t *testing.T) { s := testStateStore(t) restore := s.Restore() - if err := restore.Coordinates(6, dump); err != nil { - t.Fatalf("err: %s", err) - } + require.NoError(t, restore.Coordinates(6, dump)) restore.Commit() // Read the restored coordinates back out and verify that they match. idx, res, err := s.Coordinates(nil, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } + require.NoError(t, err) + require.Equal(t, uint64(6), idx, "bad index") require.Equal(t, updates, res) // Check that the index was updated (note that it got passed // in during the restore). - if idx := s.maxIndex("coordinates"); idx != 6 { - t.Fatalf("bad index: %d", idx) - } - }() - + require.Equal(t, uint64(6), s.maxIndex("coordinates"), "bad index") + }) } diff --git a/agent/consul/state/indexer.go b/agent/consul/state/indexer.go index 46c4965591..044306d8e1 100644 --- a/agent/consul/state/indexer.go +++ b/agent/consul/state/indexer.go @@ -2,6 +2,7 @@ package state import ( "bytes" + "encoding/binary" "errors" "fmt" ) @@ -113,6 +114,15 @@ func (b *indexBuilder) String(v string) { (*bytes.Buffer)(b).WriteString(null) } +func (b *indexBuilder) Int64(v int64) { + const size = binary.MaxVarintLen64 + + // Get the value and encode it + buf := make([]byte, size) + binary.PutVarint(buf, v) + b.Raw(buf) +} + // Raw appends the bytes without a null terminator to the buffer. Raw should // only be used when v has a fixed length, or when building the last segment of // a prefix index. diff --git a/agent/consul/state/query.go b/agent/consul/state/query.go index 4ca1051f4f..341f995808 100644 --- a/agent/consul/state/query.go +++ b/agent/consul/state/query.go @@ -40,6 +40,17 @@ func indexFromQuery(arg interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexFromServiceNameAsString(arg interface{}) ([]byte, error) { + sn, ok := arg.(structs.ServiceName) + if !ok { + return nil, fmt.Errorf("unexpected type %T for ServiceName index", arg) + } + + var b indexBuilder + b.String(strings.ToLower(sn.String())) + return b.Bytes(), nil +} + // uuidStringToBytes is a modified version of memdb.UUIDFieldIndex.parseString func uuidStringToBytes(uuid string) ([]byte, error) { l := len(uuid) diff --git a/agent/consul/state/query_oss.go b/agent/consul/state/query_oss.go index 66f9e37ec0..98091f0f17 100644 --- a/agent/consul/state/query_oss.go +++ b/agent/consul/state/query_oss.go @@ -23,3 +23,22 @@ func prefixIndexFromQuery(arg interface{}) ([]byte, error) { return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg) } + +func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) { + return prefixIndexFromQuery(arg) +} + +func prefixIndexFromServiceNameAsString(arg interface{}) ([]byte, error) { + var b indexBuilder + switch v := arg.(type) { + case *structs.EnterpriseMeta: + return nil, nil + case structs.EnterpriseMeta: + return nil, nil + case structs.ServiceName: + b.String(strings.ToLower(v.String())) + return b.Bytes(), nil + } + + return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg) +} diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index f90b28027d..8b1c6507e9 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -36,14 +36,18 @@ func TestNewDBSchema_Indexers(t *testing.T) { require.NoError(t, schema.Validate()) var testcases = map[string]func() map[string]indexerTestCase{ - tableACLPolicies: testIndexerTableACLPolicies, - tableACLRoles: testIndexerTableACLRoles, + // acl + tableACLPolicies: testIndexerTableACLPolicies, + tableACLRoles: testIndexerTableACLRoles, + // catalog tableChecks: testIndexerTableChecks, tableServices: testIndexerTableServices, tableNodes: testIndexerTableNodes, - tableConfigEntries: testIndexerTableConfigEntries, + tableCoordinates: testIndexerTableCoordinates, tableMeshTopology: testIndexerTableMeshTopology, tableGatewayServices: testIndexerTableGatewayServices, + // config + tableConfigEntries: testIndexerTableConfigEntries, } addEnterpriseIndexerTestCases(testcases) diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index bcc9ad5933..8e7da5bcab 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -57,18 +57,37 @@ func testStateStore(t *testing.T) *Store { } func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) { - testRegisterNodeWithMeta(t, s, idx, nodeID, nil) + testRegisterNodeOpts(t, s, idx, nodeID) } // testRegisterNodeWithChange registers a node and ensures it gets different from previous registration func testRegisterNodeWithChange(t *testing.T, s *Store, idx uint64, nodeID string) { - testRegisterNodeWithMeta(t, s, idx, nodeID, map[string]string{ + testRegisterNodeOpts(t, s, idx, nodeID, regNodeWithMeta(map[string]string{ "version": fmt.Sprint(idx), - }) + })) } func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) { - node := &structs.Node{Node: nodeID, Meta: meta} + testRegisterNodeOpts(t, s, idx, nodeID, regNodeWithMeta(meta)) +} + +type regNodeOption func(*structs.Node) error + +func regNodeWithMeta(meta map[string]string) func(*structs.Node) error { + return func(node *structs.Node) error { + node.Meta = meta + return nil + } +} + +func testRegisterNodeOpts(t *testing.T, s *Store, idx uint64, nodeID string, opts ...regNodeOption) { + node := &structs.Node{Node: nodeID} + for _, opt := range opts { + if err := opt(node); err != nil { + t.Fatalf("err: %s", err) + } + } + if err := s.EnsureNode(idx, node); err != nil { t.Fatalf("err: %s", err) } @@ -283,7 +302,7 @@ func TestStateStore_maxIndex(t *testing.T) { testRegisterNode(t, s, 1, "bar") testRegisterService(t, s, 2, "foo", "consul") - if max := s.maxIndex("nodes", tableServices); max != 2 { + if max := s.maxIndex(tableNodes, tableServices); max != 2 { t.Fatalf("bad max: %d", max) } } @@ -295,12 +314,12 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) { testRegisterNode(t, s, 1, "bar") tx := s.db.WriteTxnRestore() - if err := indexUpdateMaxTxn(tx, 3, "nodes"); err != nil { + if err := indexUpdateMaxTxn(tx, 3, tableNodes); err != nil { t.Fatalf("err: %s", err) } require.NoError(t, tx.Commit()) - if max := s.maxIndex("nodes"); max != 3 { + if max := s.maxIndex(tableNodes); max != 3 { t.Fatalf("bad max: %d", max) } } diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index f4306d0fb4..bb682efd37 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -149,11 +149,13 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs var entry *structs.Node var err error + // TODO(partitions): change these errors to include node partitions when printing + getNode := func() (*structs.Node, error) { if op.Node.ID != "" { return getNodeIDTxn(tx, op.Node.ID) } else { - return getNodeTxn(tx, op.Node.Node) + return getNodeTxn(tx, op.Node.Node, op.Node.GetEnterpriseMeta()) } } @@ -180,11 +182,11 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs entry, err = getNode() case api.NodeDelete: - err = s.deleteNodeTxn(tx, idx, op.Node.Node) + err = s.deleteNodeTxn(tx, idx, op.Node.Node, op.Node.GetEnterpriseMeta()) case api.NodeDeleteCAS: var ok bool - ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node) + ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node, op.Node.GetEnterpriseMeta()) if !ok && err == nil { err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node) } diff --git a/agent/consul/state/txn_test.go b/agent/consul/state/txn_test.go index 9df2cac232..17adc2bc36 100644 --- a/agent/consul/state/txn_test.go +++ b/agent/consul/state/txn_test.go @@ -320,7 +320,8 @@ func TestStateStore_Txn_Service(t *testing.T) { // Make sure it looks as expected. expectedServices := &structs.NodeServices{ Node: &structs.Node{ - Node: "node1", + Node: "node1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), RaftIndex: structs.RaftIndex{ CreateIndex: 1, ModifyIndex: 1, diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 3599589ee6..a41740b03d 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -115,6 +115,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ Node: "node1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Datacenter: "dc1", Address: "3.4.5.6", RaftIndex: raftIndex(ids, "reg2", "reg2"), @@ -145,6 +146,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ Node: "node2", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Datacenter: "dc1", Address: "1.2.3.4", RaftIndex: raftIndex(ids, "reg3", "reg3"), @@ -194,6 +196,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ Node: "node2", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Datacenter: "dc1", Address: "1.2.3.4", RaftIndex: raftIndex(ids, "reg3", "reg3"), @@ -465,6 +468,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ Node: "node1", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Datacenter: "dc2", Address: "3.4.5.6", RaftIndex: raftIndex(ids, "reg2", "reg2"), @@ -495,6 +499,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ Node: "node2", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Datacenter: "dc2", Address: "1.2.3.4", RaftIndex: raftIndex(ids, "reg3", "reg3"), @@ -544,6 +549,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ Node: "node2", + Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Datacenter: "dc2", Address: "1.2.3.4", RaftIndex: raftIndex(ids, "reg3", "reg3"), diff --git a/agent/structs/structs.go b/agent/structs/structs.go index d598385f54..f1013ddcaf 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -437,7 +437,7 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool { // Check if any of the node-level fields are being changed. if r.ID != node.ID || r.Node != node.Node || - // TODO(partitions): do we need to check partition here? + r.PartitionOrDefault() != node.PartitionOrDefault() || r.Address != node.Address || r.Datacenter != node.Datacenter || !reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) || diff --git a/api/catalog_test.go b/api/catalog_test.go index 2645ae5a05..093d6d974b 100644 --- a/api/catalog_test.go +++ b/api/catalog_test.go @@ -51,6 +51,7 @@ func TestAPI_CatalogNodes(t *testing.T) { { ID: s.Config.NodeID, Node: s.Config.NodeName, + Partition: defaultPartition, Address: "127.0.0.1", Datacenter: "dc1", TaggedAddresses: map[string]string{ diff --git a/api/txn_test.go b/api/txn_test.go index cf49fb7d08..3d69baff10 100644 --- a/api/txn_test.go +++ b/api/txn_test.go @@ -170,6 +170,7 @@ func TestAPI_ClientTxn(t *testing.T) { Node: &Node{ ID: nodeID, Node: "foo", + Partition: defaultPartition, Address: "2.2.2.2", Datacenter: "dc1", CreateIndex: ret.Results[2].Node.CreateIndex, @@ -269,6 +270,7 @@ func TestAPI_ClientTxn(t *testing.T) { Node: &Node{ ID: s.Config.NodeID, Node: s.Config.NodeName, + Partition: defaultPartition, Address: "127.0.0.1", Datacenter: "dc1", TaggedAddresses: map[string]string{ @@ -283,7 +285,7 @@ func TestAPI_ClientTxn(t *testing.T) { }, }, } - require.Equal(r, ret.Results, expected) + require.Equal(r, expected, ret.Results) }) // Sanity check using the regular GET API.