diff --git a/.changelog/12399.txt b/.changelog/12399.txt new file mode 100644 index 0000000000..10f4f87cf8 --- /dev/null +++ b/.changelog/12399.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +catalog: Add per-node indexes to reduce watchset firing for unrelated nodes and services. +``` \ No newline at end of file diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index d76f87bf62..06990011ed 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -17,9 +17,15 @@ import ( "github.com/hashicorp/consul/types" ) -// indexServiceExtinction keeps track of the last raft index when the last instance -// of any service was unregistered. This is used by blocking queries on missing services. -const indexServiceExtinction = "service_last_extinction" +const ( + // indexServiceExtinction keeps track of the last raft index when the last instance + // of any service was unregistered. This is used by blocking queries on missing services. + indexServiceExtinction = "service_last_extinction" + + // indexNodeExtinction keeps track of the last raft index when the last instance + // of any node was unregistered. This is used by blocking queries on missing nodes. + indexNodeExtinction = "node_last_extinction" +) const ( // minUUIDLookupLen is used as a minimum length of a node name required before @@ -414,8 +420,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod // We are actually renaming a node, remove its reference first err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName) if err != nil { - return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s", - node.ID, node.Address, n.Node, node.Node) + return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s: %w", + node.ID, node.Address, n.Node, node.Node, err) } } } else { @@ -764,6 +770,15 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta return fmt.Errorf("failed updating index: %s", err) } + // Clean up node entry from index table + if err := tx.Delete(tableIndex, &IndexEntry{Key: nodeIndexName(nodeName, entMeta, node.PeerName)}); err != nil { + return fmt.Errorf("failed deleting nodeIndex %q: %w", nodeIndexName(nodeName, entMeta, node.PeerName), err) + } + + if err := catalogUpdateNodeExtinctionIndex(tx, idx, entMeta, node.PeerName); err != nil { + return err + } + if peerName == "" { // Invalidate any sessions for this node. toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault()) @@ -1683,9 +1698,6 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() } - // Get the table index. - idx := catalogMaxIndex(tx, entMeta, peerName, false) - // Query the node by node name watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{ Value: nodeNameOrID, @@ -1712,16 +1724,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac }) if err != nil { ws.Add(watchCh) - // TODO(sean@): We could/should log an error re: the uuid_prefix lookup - // failing once a logger has been introduced to the catalog. - return true, 0, nil, nil, nil + idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName) + return true, idx, nil, nil, nil } n = iter.Next() if n == nil { // No nodes matched, even with the Node ID: add a watch on the node name. ws.Add(watchCh) - return true, 0, nil, nil, nil + idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName) + return true, idx, nil, nil, nil } idWatchCh := iter.WatchCh() @@ -1745,6 +1757,9 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac } ws.Add(services.WatchCh()) + // Get the table index. + idx := catalogNodeMaxIndex(tx, nodeName, entMeta, peerName) + return false, idx, node, services, nil } @@ -1902,10 +1917,17 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st svc := service.(*structs.ServiceNode) if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil { - return err + return fmt.Errorf("failed updating services indexes: %w", err) } if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil { - return err + return fmt.Errorf("failed updating service-kind indexes: %w", err) + } + // Update the node indexes as the service information is included in node catalog queries. + if err := catalogUpdateNodesIndexes(tx, idx, entMeta, peerName); err != nil { + return fmt.Errorf("failed updating nodes indexes: %w", err) + } + if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, peerName); err != nil { + return fmt.Errorf("failed updating node indexes: %w", err) } name := svc.CompoundServiceName() @@ -1930,7 +1952,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st _, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName) if err == nil && serviceIndex != nil { // we found service. index, garbage collect it - if errW := tx.Delete(tableIndex, serviceIndex); errW != nil { + if err := tx.Delete(tableIndex, serviceIndex); err != nil { return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) } } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index b0c0c53376..706a265323 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -7,7 +7,7 @@ import ( "fmt" "strings" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -24,8 +24,12 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *acl.EnterpriseMeta, peerN return peeredIndexEntryName(base, peerName) } +func nodeIndexName(name string, _ *acl.EnterpriseMeta, peerName string) string { + return peeredIndexEntryName(fmt.Sprintf("node.%s", name), peerName) +} + func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { - // overall nodes index + // overall nodes index for snapshot and ListNodes RPC if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -38,12 +42,22 @@ func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, p return nil } +// catalogUpdateNodeIndexes upserts the max index for a single node +func catalogUpdateNodeIndexes(tx WriteTxn, idx uint64, nodeName string, _ *acl.EnterpriseMeta, peerName string) error { + // per-node index + if err := indexUpdateMaxTxn(tx, idx, nodeIndexName(nodeName, nil, peerName)); err != nil { + return fmt.Errorf("failed updating node index: %w", err) + } + + return nil +} + // catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels // of granularity (no-op if `idx` is lower than what exists for that index key): // - all services // - all services in a specified peer (including internal) func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { - // overall services index + // overall services index for snapshot if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil { return fmt.Errorf("failed updating index for services table: %w", err) } @@ -84,14 +98,16 @@ func catalogUpdateServiceIndexes(tx WriteTxn, idx uint64, serviceName string, _ } func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { - if err := indexUpdateMaxTxn(tx, idx, indexServiceExtinction); err != nil { - return fmt.Errorf("failed updating missing service extinction index: %w", err) - } - // update the peer index if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil { return fmt.Errorf("failed updating missing service extinction peered index: %w", err) } + return nil +} +func catalogUpdateNodeExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { + if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexNodeExtinction, peerName)); err != nil { + return fmt.Errorf("failed updating missing node extinction peered index: %w", err) + } return nil } @@ -105,7 +121,10 @@ func catalogInsertNode(tx WriteTxn, node *structs.Node) error { } if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil { - return err + return fmt.Errorf("failed updating nodes indexes: %w", err) + } + if err := catalogUpdateNodeIndexes(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta(), node.PeerName); err != nil { + return fmt.Errorf("failed updating node indexes: %w", err) } // Update the node's service indexes as the node information is included @@ -125,15 +144,23 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error { } if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil { - return err + return fmt.Errorf("failed updating services indexes: %w", err) } if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil { - return err + return fmt.Errorf("failed updating service indexes: %w", err) } if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil { - return err + return fmt.Errorf("failed updating service-kind indexes: %w", err) + } + + // Update the node indexes as the service information is included in node catalog queries. + if err := catalogUpdateNodesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil { + return fmt.Errorf("failed updating nodes indexes: %w", err) + } + if err := catalogUpdateNodeIndexes(tx, svc.ModifyIndex, svc.Node, &svc.EnterpriseMeta, svc.PeerName); err != nil { + return fmt.Errorf("failed updating node indexes: %w", err) } return nil @@ -143,6 +170,14 @@ func catalogNodesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) ui return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName)) } +func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *acl.EnterpriseMeta, peerName string) uint64 { + return maxIndexTxn(tx, nodeIndexName(nodeName, nil, peerName)) +} + +func catalogNodeLastExtinctionIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 { + return maxIndexTxn(tx, peeredIndexEntryName(indexNodeExtinction, peerName)) +} + func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 { return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName)) } @@ -185,7 +220,6 @@ func catalogMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string, checks } func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 { - // TODO(peering_indexes): pipe peerName here if checks { return maxIndexWatchTxn(tx, ws, peeredIndexEntryName(tableChecks, peerName), @@ -200,7 +234,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, } func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { - // update the universal index entry + // update the overall index entry for snapshot if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil { return fmt.Errorf("failed updating index: %s", err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index b48e0a04dc..fd88ac2e10 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -555,7 +555,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { ) run := func(t *testing.T, peerName string) { - verifyNode := func(t *testing.T, s *Store, nodeLookup string) { + verifyNode := func(t *testing.T, s *Store, nodeLookup string, expectIdx uint64) { idx, out, err := s.GetNode(nodeLookup, nil, peerName) require.NoError(t, err) byID := false @@ -566,7 +566,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { } require.NotNil(t, out) - require.Equal(t, uint64(1), idx) + require.Equal(t, expectIdx, idx) require.Equal(t, "1.2.3.4", out.Address) if byID { @@ -661,8 +661,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { require.NoError(t, restore.Commit()) // Retrieve the node and verify its contents. - verifyNode(t, s, nodeID) - verifyNode(t, s, nodeName) + verifyNode(t, s, nodeID, 1) + verifyNode(t, s, nodeName, 1) }) // Add in a service definition. @@ -686,8 +686,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { require.NoError(t, restore.Commit()) // Verify that the service got registered. - verifyNode(t, s, nodeID) - verifyNode(t, s, nodeName) + verifyNode(t, s, nodeID, 2) + verifyNode(t, s, nodeName, 2) verifyService(t, s, nodeID) verifyService(t, s, nodeName) }) @@ -726,8 +726,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { require.NoError(t, restore.Commit()) // Verify that the check got registered. - verifyNode(t, s, nodeID) - verifyNode(t, s, nodeName) + verifyNode(t, s, nodeID, 2) + verifyNode(t, s, nodeName, 2) verifyService(t, s, nodeID) verifyService(t, s, nodeName) verifyCheck(t, s) @@ -776,8 +776,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { require.NoError(t, restore.Commit()) // Verify that the additional check got registered. - verifyNode(t, s, nodeID) - verifyNode(t, s, nodeName) + verifyNode(t, s, nodeID, 2) + verifyNode(t, s, nodeName, 2) verifyService(t, s, nodeID) verifyService(t, s, nodeName) verifyChecks(t, s) @@ -976,7 +976,7 @@ func TestNodeRenamingNodes(t *testing.T) { Address: "1.1.1.2", } if err := s.EnsureNode(10, in2Modify); err != nil { - t.Fatalf("Renaming node2 into node1 should fail") + t.Fatalf("Renaming node2 into node1 should not fail: " + err.Error()) } // Retrieve the node again @@ -1550,20 +1550,16 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Indexes were updated. - for _, tbl := range []string{tableNodes, tableServices, tableChecks} { - if idx := s.maxIndex(tbl); idx != 3 { - t.Fatalf("bad index: %d (%s)", idx, tbl) - } - } + assert.Equal(t, uint64(3), catalogChecksMaxIndex(tx, nil, "")) + assert.Equal(t, uint64(3), catalogServicesMaxIndex(tx, nil, "")) + assert.Equal(t, uint64(3), catalogNodesMaxIndex(tx, nil, "")) // Deleting a nonexistent node should be idempotent and not return // an error if err := s.DeleteNode(4, "node1", nil, ""); err != nil { t.Fatalf("err: %s", err) } - if idx := s.maxIndex(tableNodes); idx != 3 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(3), catalogNodesMaxIndex(s.db.ReadTxn(), nil, "")) } func TestStateStore_Node_Snapshot(t *testing.T) { @@ -1690,7 +1686,8 @@ func TestStateStore_EnsureService(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if idx != 30 { + // expect node1's max idx + if idx != 20 { t.Fatalf("bad index: %d", idx) } @@ -1713,9 +1710,7 @@ func TestStateStore_EnsureService(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex(tableServices); idx != 30 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(30), catalogServicesMaxIndex(s.db.ReadTxn(), nil, "")) // Update a service registration. ns1.Address = "1.1.1.2" @@ -1744,9 +1739,7 @@ func TestStateStore_EnsureService(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex(tableServices); idx != 40 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(40), catalogServicesMaxIndex(s.db.ReadTxn(), nil, "")) } func TestStateStore_EnsureService_connectProxy(t *testing.T) { @@ -2571,21 +2564,15 @@ func TestStateStore_DeleteService(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex(tableServices); idx != 4 { - t.Fatalf("bad index: %d", idx) - } - if idx := s.maxIndex(tableChecks); idx != 4 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(4), catalogChecksMaxIndex(tx, nil, "")) + assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, "")) // Deleting a nonexistent service should be idempotent and not return an // error, nor fire a watch. if err := s.DeleteService(5, "node1", "service1", nil, ""); err != nil { t.Fatalf("err: %s", err) } - if idx := s.maxIndex(tableServices); idx != 4 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, "")) if watchFired(ws) { t.Fatalf("bad") } @@ -2906,9 +2893,7 @@ func TestStateStore_EnsureCheck(t *testing.T) { testCheckOutput(t, 5, 5, "bbbmodified") // Index tables were updated - if idx := s.maxIndex(tableChecks); idx != 5 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(5), catalogChecksMaxIndex(s.db.ReadTxn(), nil, "")) } func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) { @@ -3387,9 +3372,7 @@ func TestStateStore_DeleteCheck(t *testing.T) { if idx, check, err := s.NodeCheck("node1", "check1", nil, ""); idx != 3 || err != nil || check != nil { t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err) } - if idx := s.maxIndex(tableChecks); idx != 3 { - t.Fatalf("bad index for checks: %d", idx) - } + assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, "")) if !watchFired(ws) { t.Fatalf("bad") } @@ -3407,18 +3390,14 @@ func TestStateStore_DeleteCheck(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex(tableChecks); idx != 3 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, "")) // Deleting a nonexistent check should be idempotent and not return an // error. if err := s.DeleteCheck(4, "node1", "check1", nil, ""); err != nil { t.Fatalf("err: %s", err) } - if idx := s.maxIndex(tableChecks); idx != 3 { - t.Fatalf("bad index: %d", idx) - } + assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, "")) if watchFired(ws) { t.Fatalf("bad") } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index d8aa98dd98..598409a2ce 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -263,25 +263,25 @@ func (s *Store) Abandon() { } // maxIndex is a helper used to retrieve the highest known index -// amongst a set of tables in the db. -func (s *Store) maxIndex(tables ...string) uint64 { +// amongst a set of index keys (e.g. table names) in the db. +func (s *Store) maxIndex(keys ...string) uint64 { tx := s.db.Txn(false) defer tx.Abort() - return maxIndexTxn(tx, tables...) + return maxIndexTxn(tx, keys...) } // maxIndexTxn is a helper used to retrieve the highest known index -// amongst a set of tables in the db. -func maxIndexTxn(tx ReadTxn, tables ...string) uint64 { - return maxIndexWatchTxn(tx, nil, tables...) +// amongst a set of index keys (e.g. table names) in the db. +func maxIndexTxn(tx ReadTxn, keys ...string) uint64 { + return maxIndexWatchTxn(tx, nil, keys...) } -func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 { +func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, keys ...string) uint64 { var lindex uint64 - for _, table := range tables { - ch, ti, err := tx.FirstWatch(tableIndex, "id", table) + for _, key := range keys { + ch, ti, err := tx.FirstWatch(tableIndex, "id", key) if err != nil { - panic(fmt.Sprintf("unknown index: %s err: %s", table, err)) + panic(fmt.Sprintf("unknown index: %s err: %s", key, err)) } if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex { lindex = idx.Value