Add per-node max indexes (#12399)

Adds fine-grained node.[node] entries to the index table, allowing blocking queries to return fine-grained indexes that prevent them from returning immediately when unrelated nodes/services are updated.

Co-authored-by: kisunji <ckim@hashicorp.com>
This commit is contained in:
Will Jordan 2022-06-23 08:13:25 -07:00 committed by GitHub
parent ba89a7d9b0
commit 34ecbc1d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 85 deletions

3
.changelog/12399.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:enhancement
catalog: Add per-node indexes to reduce watchset firing for unrelated nodes and services.
```

View File

@ -17,9 +17,15 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
// indexServiceExtinction keeps track of the last raft index when the last instance const (
// of any service was unregistered. This is used by blocking queries on missing services. // indexServiceExtinction keeps track of the last raft index when the last instance
const indexServiceExtinction = "service_last_extinction" // of any service was unregistered. This is used by blocking queries on missing services.
indexServiceExtinction = "service_last_extinction"
// indexNodeExtinction keeps track of the last raft index when the last instance
// of any node was unregistered. This is used by blocking queries on missing nodes.
indexNodeExtinction = "node_last_extinction"
)
const ( const (
// minUUIDLookupLen is used as a minimum length of a node name required before // minUUIDLookupLen is used as a minimum length of a node name required before
@ -414,8 +420,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
// We are actually renaming a node, remove its reference first // We are actually renaming a node, remove its reference first
err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName) err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName)
if err != nil { if err != nil {
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s", return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s: %w",
node.ID, node.Address, n.Node, node.Node) node.ID, node.Address, n.Node, node.Node, err)
} }
} }
} else { } else {
@ -764,6 +770,15 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
// Clean up node entry from index table
if err := tx.Delete(tableIndex, &IndexEntry{Key: nodeIndexName(nodeName, entMeta, node.PeerName)}); err != nil {
return fmt.Errorf("failed deleting nodeIndex %q: %w", nodeIndexName(nodeName, entMeta, node.PeerName), err)
}
if err := catalogUpdateNodeExtinctionIndex(tx, idx, entMeta, node.PeerName); err != nil {
return err
}
if peerName == "" { if peerName == "" {
// Invalidate any sessions for this node. // Invalidate any sessions for this node.
toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault()) toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault())
@ -1683,9 +1698,6 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition() entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
} }
// Get the table index.
idx := catalogMaxIndex(tx, entMeta, peerName, false)
// Query the node by node name // Query the node by node name
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{ watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
Value: nodeNameOrID, Value: nodeNameOrID,
@ -1712,16 +1724,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
}) })
if err != nil { if err != nil {
ws.Add(watchCh) ws.Add(watchCh)
// TODO(sean@): We could/should log an error re: the uuid_prefix lookup idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
// failing once a logger has been introduced to the catalog. return true, idx, nil, nil, nil
return true, 0, nil, nil, nil
} }
n = iter.Next() n = iter.Next()
if n == nil { if n == nil {
// No nodes matched, even with the Node ID: add a watch on the node name. // No nodes matched, even with the Node ID: add a watch on the node name.
ws.Add(watchCh) ws.Add(watchCh)
return true, 0, nil, nil, nil idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
return true, idx, nil, nil, nil
} }
idWatchCh := iter.WatchCh() idWatchCh := iter.WatchCh()
@ -1745,6 +1757,9 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
} }
ws.Add(services.WatchCh()) ws.Add(services.WatchCh())
// Get the table index.
idx := catalogNodeMaxIndex(tx, nodeName, entMeta, peerName)
return false, idx, node, services, nil return false, idx, node, services, nil
} }
@ -1902,10 +1917,17 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
svc := service.(*structs.ServiceNode) svc := service.(*structs.ServiceNode)
if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil { if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil {
return err return fmt.Errorf("failed updating services indexes: %w", err)
} }
if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil { if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err return fmt.Errorf("failed updating service-kind indexes: %w", err)
}
// Update the node indexes as the service information is included in node catalog queries.
if err := catalogUpdateNodesIndexes(tx, idx, entMeta, peerName); err != nil {
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, peerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
} }
name := svc.CompoundServiceName() name := svc.CompoundServiceName()
@ -1930,7 +1952,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName) _, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName)
if err == nil && serviceIndex != nil { if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it // we found service.<serviceName> index, garbage collect it
if errW := tx.Delete(tableIndex, serviceIndex); errW != nil { if err := tx.Delete(tableIndex, serviceIndex); err != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
} }
} }

View File

@ -7,7 +7,7 @@ import (
"fmt" "fmt"
"strings" "strings"
memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -24,8 +24,12 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *acl.EnterpriseMeta, peerN
return peeredIndexEntryName(base, peerName) return peeredIndexEntryName(base, peerName)
} }
func nodeIndexName(name string, _ *acl.EnterpriseMeta, peerName string) string {
return peeredIndexEntryName(fmt.Sprintf("node.%s", name), peerName)
}
func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// overall nodes index // overall nodes index for snapshot and ListNodes RPC
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil { if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
@ -38,12 +42,22 @@ func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, p
return nil return nil
} }
// catalogUpdateNodeIndexes upserts the max index for a single node
func catalogUpdateNodeIndexes(tx WriteTxn, idx uint64, nodeName string, _ *acl.EnterpriseMeta, peerName string) error {
// per-node index
if err := indexUpdateMaxTxn(tx, idx, nodeIndexName(nodeName, nil, peerName)); err != nil {
return fmt.Errorf("failed updating node index: %w", err)
}
return nil
}
// catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels // catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels
// of granularity (no-op if `idx` is lower than what exists for that index key): // of granularity (no-op if `idx` is lower than what exists for that index key):
// - all services // - all services
// - all services in a specified peer (including internal) // - all services in a specified peer (including internal)
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// overall services index // overall services index for snapshot
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil { if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
return fmt.Errorf("failed updating index for services table: %w", err) return fmt.Errorf("failed updating index for services table: %w", err)
} }
@ -84,14 +98,16 @@ func catalogUpdateServiceIndexes(tx WriteTxn, idx uint64, serviceName string, _
} }
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
if err := indexUpdateMaxTxn(tx, idx, indexServiceExtinction); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %w", err)
}
// update the peer index
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil { if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil {
return fmt.Errorf("failed updating missing service extinction peered index: %w", err) return fmt.Errorf("failed updating missing service extinction peered index: %w", err)
} }
return nil
}
func catalogUpdateNodeExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexNodeExtinction, peerName)); err != nil {
return fmt.Errorf("failed updating missing node extinction peered index: %w", err)
}
return nil return nil
} }
@ -105,7 +121,10 @@ func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
} }
if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil { if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil {
return err return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta(), node.PeerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
} }
// Update the node's service indexes as the node information is included // Update the node's service indexes as the node information is included
@ -125,15 +144,23 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
} }
if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil { if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err return fmt.Errorf("failed updating services indexes: %w", err)
} }
if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil { if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err return fmt.Errorf("failed updating service indexes: %w", err)
} }
if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil { if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return err return fmt.Errorf("failed updating service-kind indexes: %w", err)
}
// Update the node indexes as the service information is included in node catalog queries.
if err := catalogUpdateNodesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return fmt.Errorf("failed updating nodes indexes: %w", err)
}
if err := catalogUpdateNodeIndexes(tx, svc.ModifyIndex, svc.Node, &svc.EnterpriseMeta, svc.PeerName); err != nil {
return fmt.Errorf("failed updating node indexes: %w", err)
} }
return nil return nil
@ -143,6 +170,14 @@ func catalogNodesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) ui
return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName)) return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName))
} }
func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, nodeIndexName(nodeName, nil, peerName))
}
func catalogNodeLastExtinctionIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, peeredIndexEntryName(indexNodeExtinction, peerName))
}
func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 { func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName)) return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName))
} }
@ -185,7 +220,6 @@ func catalogMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string, checks
} }
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 { func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 {
// TODO(peering_indexes): pipe peerName here
if checks { if checks {
return maxIndexWatchTxn(tx, ws, return maxIndexWatchTxn(tx, ws,
peeredIndexEntryName(tableChecks, peerName), peeredIndexEntryName(tableChecks, peerName),
@ -200,7 +234,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta,
} }
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error { func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
// update the universal index entry // update the overall index entry for snapshot
if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil { if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }

View File

@ -555,7 +555,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
) )
run := func(t *testing.T, peerName string) { run := func(t *testing.T, peerName string) {
verifyNode := func(t *testing.T, s *Store, nodeLookup string) { verifyNode := func(t *testing.T, s *Store, nodeLookup string, expectIdx uint64) {
idx, out, err := s.GetNode(nodeLookup, nil, peerName) idx, out, err := s.GetNode(nodeLookup, nil, peerName)
require.NoError(t, err) require.NoError(t, err)
byID := false byID := false
@ -566,7 +566,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
} }
require.NotNil(t, out) require.NotNil(t, out)
require.Equal(t, uint64(1), idx) require.Equal(t, expectIdx, idx)
require.Equal(t, "1.2.3.4", out.Address) require.Equal(t, "1.2.3.4", out.Address)
if byID { if byID {
@ -661,8 +661,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
require.NoError(t, restore.Commit()) require.NoError(t, restore.Commit())
// Retrieve the node and verify its contents. // Retrieve the node and verify its contents.
verifyNode(t, s, nodeID) verifyNode(t, s, nodeID, 1)
verifyNode(t, s, nodeName) verifyNode(t, s, nodeName, 1)
}) })
// Add in a service definition. // Add in a service definition.
@ -686,8 +686,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
require.NoError(t, restore.Commit()) require.NoError(t, restore.Commit())
// Verify that the service got registered. // Verify that the service got registered.
verifyNode(t, s, nodeID) verifyNode(t, s, nodeID, 2)
verifyNode(t, s, nodeName) verifyNode(t, s, nodeName, 2)
verifyService(t, s, nodeID) verifyService(t, s, nodeID)
verifyService(t, s, nodeName) verifyService(t, s, nodeName)
}) })
@ -726,8 +726,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
require.NoError(t, restore.Commit()) require.NoError(t, restore.Commit())
// Verify that the check got registered. // Verify that the check got registered.
verifyNode(t, s, nodeID) verifyNode(t, s, nodeID, 2)
verifyNode(t, s, nodeName) verifyNode(t, s, nodeName, 2)
verifyService(t, s, nodeID) verifyService(t, s, nodeID)
verifyService(t, s, nodeName) verifyService(t, s, nodeName)
verifyCheck(t, s) verifyCheck(t, s)
@ -776,8 +776,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
require.NoError(t, restore.Commit()) require.NoError(t, restore.Commit())
// Verify that the additional check got registered. // Verify that the additional check got registered.
verifyNode(t, s, nodeID) verifyNode(t, s, nodeID, 2)
verifyNode(t, s, nodeName) verifyNode(t, s, nodeName, 2)
verifyService(t, s, nodeID) verifyService(t, s, nodeID)
verifyService(t, s, nodeName) verifyService(t, s, nodeName)
verifyChecks(t, s) verifyChecks(t, s)
@ -976,7 +976,7 @@ func TestNodeRenamingNodes(t *testing.T) {
Address: "1.1.1.2", Address: "1.1.1.2",
} }
if err := s.EnsureNode(10, in2Modify); err != nil { if err := s.EnsureNode(10, in2Modify); err != nil {
t.Fatalf("Renaming node2 into node1 should fail") t.Fatalf("Renaming node2 into node1 should not fail: " + err.Error())
} }
// Retrieve the node again // Retrieve the node again
@ -1550,20 +1550,16 @@ func TestStateStore_DeleteNode(t *testing.T) {
} }
// Indexes were updated. // Indexes were updated.
for _, tbl := range []string{tableNodes, tableServices, tableChecks} { assert.Equal(t, uint64(3), catalogChecksMaxIndex(tx, nil, ""))
if idx := s.maxIndex(tbl); idx != 3 { assert.Equal(t, uint64(3), catalogServicesMaxIndex(tx, nil, ""))
t.Fatalf("bad index: %d (%s)", idx, tbl) assert.Equal(t, uint64(3), catalogNodesMaxIndex(tx, nil, ""))
}
}
// Deleting a nonexistent node should be idempotent and not return // Deleting a nonexistent node should be idempotent and not return
// an error // an error
if err := s.DeleteNode(4, "node1", nil, ""); err != nil { if err := s.DeleteNode(4, "node1", nil, ""); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx := s.maxIndex(tableNodes); idx != 3 { assert.Equal(t, uint64(3), catalogNodesMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index: %d", idx)
}
} }
func TestStateStore_Node_Snapshot(t *testing.T) { func TestStateStore_Node_Snapshot(t *testing.T) {
@ -1690,7 +1686,8 @@ func TestStateStore_EnsureService(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 30 { // expect node1's max idx
if idx != 20 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -1713,9 +1710,7 @@ func TestStateStore_EnsureService(t *testing.T) {
} }
// Index tables were updated. // Index tables were updated.
if idx := s.maxIndex(tableServices); idx != 30 { assert.Equal(t, uint64(30), catalogServicesMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index: %d", idx)
}
// Update a service registration. // Update a service registration.
ns1.Address = "1.1.1.2" ns1.Address = "1.1.1.2"
@ -1744,9 +1739,7 @@ func TestStateStore_EnsureService(t *testing.T) {
} }
// Index tables were updated. // Index tables were updated.
if idx := s.maxIndex(tableServices); idx != 40 { assert.Equal(t, uint64(40), catalogServicesMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index: %d", idx)
}
} }
func TestStateStore_EnsureService_connectProxy(t *testing.T) { func TestStateStore_EnsureService_connectProxy(t *testing.T) {
@ -2571,21 +2564,15 @@ func TestStateStore_DeleteService(t *testing.T) {
} }
// Index tables were updated. // Index tables were updated.
if idx := s.maxIndex(tableServices); idx != 4 { assert.Equal(t, uint64(4), catalogChecksMaxIndex(tx, nil, ""))
t.Fatalf("bad index: %d", idx) assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, ""))
}
if idx := s.maxIndex(tableChecks); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
// Deleting a nonexistent service should be idempotent and not return an // Deleting a nonexistent service should be idempotent and not return an
// error, nor fire a watch. // error, nor fire a watch.
if err := s.DeleteService(5, "node1", "service1", nil, ""); err != nil { if err := s.DeleteService(5, "node1", "service1", nil, ""); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx := s.maxIndex(tableServices); idx != 4 { assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, ""))
t.Fatalf("bad index: %d", idx)
}
if watchFired(ws) { if watchFired(ws) {
t.Fatalf("bad") t.Fatalf("bad")
} }
@ -2906,9 +2893,7 @@ func TestStateStore_EnsureCheck(t *testing.T) {
testCheckOutput(t, 5, 5, "bbbmodified") testCheckOutput(t, 5, 5, "bbbmodified")
// Index tables were updated // Index tables were updated
if idx := s.maxIndex(tableChecks); idx != 5 { assert.Equal(t, uint64(5), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index: %d", idx)
}
} }
func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) { func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) {
@ -3387,9 +3372,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
if idx, check, err := s.NodeCheck("node1", "check1", nil, ""); idx != 3 || err != nil || check != nil { if idx, check, err := s.NodeCheck("node1", "check1", nil, ""); idx != 3 || err != nil || check != nil {
t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err) t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err)
} }
if idx := s.maxIndex(tableChecks); idx != 3 { assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index for checks: %d", idx)
}
if !watchFired(ws) { if !watchFired(ws) {
t.Fatalf("bad") t.Fatalf("bad")
} }
@ -3407,18 +3390,14 @@ func TestStateStore_DeleteCheck(t *testing.T) {
} }
// Index tables were updated. // Index tables were updated.
if idx := s.maxIndex(tableChecks); idx != 3 { assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index: %d", idx)
}
// Deleting a nonexistent check should be idempotent and not return an // Deleting a nonexistent check should be idempotent and not return an
// error. // error.
if err := s.DeleteCheck(4, "node1", "check1", nil, ""); err != nil { if err := s.DeleteCheck(4, "node1", "check1", nil, ""); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx := s.maxIndex(tableChecks); idx != 3 { assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
t.Fatalf("bad index: %d", idx)
}
if watchFired(ws) { if watchFired(ws) {
t.Fatalf("bad") t.Fatalf("bad")
} }

View File

@ -263,25 +263,25 @@ func (s *Store) Abandon() {
} }
// maxIndex is a helper used to retrieve the highest known index // maxIndex is a helper used to retrieve the highest known index
// amongst a set of tables in the db. // amongst a set of index keys (e.g. table names) in the db.
func (s *Store) maxIndex(tables ...string) uint64 { func (s *Store) maxIndex(keys ...string) uint64 {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
return maxIndexTxn(tx, tables...) return maxIndexTxn(tx, keys...)
} }
// maxIndexTxn is a helper used to retrieve the highest known index // maxIndexTxn is a helper used to retrieve the highest known index
// amongst a set of tables in the db. // amongst a set of index keys (e.g. table names) in the db.
func maxIndexTxn(tx ReadTxn, tables ...string) uint64 { func maxIndexTxn(tx ReadTxn, keys ...string) uint64 {
return maxIndexWatchTxn(tx, nil, tables...) return maxIndexWatchTxn(tx, nil, keys...)
} }
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 { func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, keys ...string) uint64 {
var lindex uint64 var lindex uint64
for _, table := range tables { for _, key := range keys {
ch, ti, err := tx.FirstWatch(tableIndex, "id", table) ch, ti, err := tx.FirstWatch(tableIndex, "id", key)
if err != nil { if err != nil {
panic(fmt.Sprintf("unknown index: %s err: %s", table, err)) panic(fmt.Sprintf("unknown index: %s err: %s", key, err))
} }
if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex { if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex {
lindex = idx.Value lindex = idx.Value