2
0
mirror of https://github.com/status-im/consul.git synced 2025-01-24 04:31:12 +00:00

Merge pull request from hashicorp/dnephin/state-index-checks

state: convert remaining checks table indexers to functional pattern
This commit is contained in:
Daniel Nephin 2021-03-31 11:53:21 -04:00 committed by GitHub
commit 26440d9e1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 36 deletions

@ -1638,8 +1638,11 @@ func (s *Store) ServiceChecks(ws memdb.WatchSet, serviceName string, entMeta *st
// Get the table index. // Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta) idx := catalogChecksMaxIndex(tx, entMeta)
// Return the checks. if entMeta == nil {
iter, err := catalogListChecksByService(tx, serviceName, entMeta) entMeta = structs.DefaultEnterpriseMeta()
}
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
iter, err := tx.Get(tableChecks, indexService, q)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err) return 0, nil, fmt.Errorf("failed check lookup: %s", err)
} }
@ -1663,8 +1666,12 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
// Get the table index. // Get the table index.
idx := maxIndexForService(tx, serviceName, true, true, entMeta) idx := maxIndexForService(tx, serviceName, true, true, entMeta)
// Return the checks.
iter, err := catalogListChecksByService(tx, serviceName, entMeta) if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
iter, err := tx.Get(tableChecks, indexService, q)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err) return 0, nil, fmt.Errorf("failed check lookup: %s", err)
} }
@ -1709,13 +1716,18 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru
// Get the table index. // Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta) idx := catalogChecksMaxIndex(tx, entMeta)
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Query all checks if HealthAny is passed, otherwise use the index. // Query all checks if HealthAny is passed, otherwise use the index.
var iter memdb.ResultIterator var iter memdb.ResultIterator
var err error var err error
if state == api.HealthAny { if state == api.HealthAny {
iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta) iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta)
} else { } else {
iter, err = catalogListChecksInState(tx, state, entMeta) q := Query{Value: state, EnterpriseMeta: *entMeta}
iter, err = tx.Get(tableChecks, indexStatus, q)
} }
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err) return 0, nil, fmt.Errorf("failed check lookup: %s", err)
@ -1857,7 +1869,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
} }
// Delete the check from the DB and update the index. // Delete the check from the DB and update the index.
if err := tx.Delete("checks", hc); err != nil { if err := tx.Delete(tableChecks, hc); err != nil {
return fmt.Errorf("failed removing check: %s", err) return fmt.Errorf("failed removing check: %s", err)
} }

@ -175,7 +175,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change} srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
markService(newNodeServiceTupleFromServiceNode(sn), srvChange) markService(newNodeServiceTupleFromServiceNode(sn), srvChange)
case "checks": case tableChecks:
// For health we only care about the scope for now to know if it's just // For health we only care about the scope for now to know if it's just
// affecting a single service or every service on a node. There is a // affecting a single service or every service on a node. There is a
// subtle edge case where the check with same ID changes from being node // subtle edge case where the check with same ID changes from being node

@ -107,46 +107,37 @@ func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (i
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
if checks { if checks {
return maxIndexTxn(tx, "nodes", tableServices, "checks") return maxIndexTxn(tx, "nodes", tableServices, tableChecks)
} }
return maxIndexTxn(tx, "nodes", tableServices) return maxIndexTxn(tx, "nodes", tableServices)
} }
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 { func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 {
if checks { if checks {
return maxIndexWatchTxn(tx, ws, "nodes", tableServices, "checks") return maxIndexWatchTxn(tx, ws, "nodes", tableServices, tableChecks)
} }
return maxIndexWatchTxn(tx, ws, "nodes", tableServices) return maxIndexWatchTxn(tx, ws, "nodes", tableServices)
} }
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// update the universal index entry // update the universal index entry
if err := tx.Insert(tableIndex, &IndexEntry{"checks", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{tableChecks, idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
return nil return nil
} }
func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 { func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "checks") return maxIndexTxn(tx, tableChecks)
} }
func catalogListChecksByNode(tx ReadTxn, q Query) (memdb.ResultIterator, error) { func catalogListChecksByNode(tx ReadTxn, q Query) (memdb.ResultIterator, error) {
return tx.Get(tableChecks, indexNode, q) return tx.Get(tableChecks, indexNode, q)
} }
func catalogListChecksByService(tx ReadTxn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "service", service)
}
func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// simpler than normal due to the use of the CompoundMultiIndex
return tx.Get("checks", "status", state)
}
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
// Insert the check // Insert the check
if err := tx.Insert("checks", chk); err != nil { if err := tx.Insert(tableChecks, chk); err != nil {
return fmt.Errorf("failed inserting check: %s", err) return fmt.Errorf("failed inserting check: %s", err)
} }

@ -10,7 +10,9 @@ func testIndexerTableChecks() map[string]indexerTestCase {
obj := &structs.HealthCheck{ obj := &structs.HealthCheck{
Node: "NoDe", Node: "NoDe",
ServiceID: "SeRvIcE", ServiceID: "SeRvIcE",
ServiceName: "ServiceName",
CheckID: "CheckID", CheckID: "CheckID",
Status: "PASSING",
} }
return map[string]indexerTestCase{ return map[string]indexerTestCase{
indexID: { indexID: {
@ -36,6 +38,26 @@ func testIndexerTableChecks() map[string]indexerTestCase {
}, },
}, },
}, },
indexStatus: {
read: indexValue{
source: Query{Value: "PASSING"},
expected: []byte("passing\x00"),
},
write: indexValue{
source: obj,
expected: []byte("passing\x00"),
},
},
indexService: {
read: indexValue{
source: Query{Value: "ServiceName"},
expected: []byte("servicename\x00"),
},
write: indexValue{
source: obj,
expected: []byte("servicename\x00"),
},
},
indexNodeService: { indexNodeService: {
read: indexValue{ read: indexValue{
source: NodeServiceQuery{ source: NodeServiceQuery{

@ -258,18 +258,18 @@ func checksTableSchema() *memdb.TableSchema {
Name: indexStatus, Name: indexStatus,
AllowMissing: false, AllowMissing: false,
Unique: false, Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: indexerSingle{
Field: "Status", readIndex: indexFromQuery,
Lowercase: false, writeIndex: indexStatusFromHealthCheck,
}, },
}, },
indexService: { indexService: {
Name: indexService, Name: indexService,
AllowMissing: true, AllowMissing: true,
Unique: false, Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: indexerSingle{
Field: "ServiceName", readIndex: indexFromQuery,
Lowercase: true, writeIndex: indexServiceNameFromHealthCheck,
}, },
}, },
indexNode: { indexNode: {
@ -342,6 +342,36 @@ func indexNodeServiceFromHealthCheck(raw interface{}) ([]byte, error) {
return b.Bytes(), nil return b.Bytes(), nil
} }
func indexStatusFromHealthCheck(raw interface{}) ([]byte, error) {
hc, ok := raw.(*structs.HealthCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw)
}
if hc.Status == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(hc.Status))
return b.Bytes(), nil
}
func indexServiceNameFromHealthCheck(raw interface{}) ([]byte, error) {
hc, ok := raw.(*structs.HealthCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw)
}
if hc.ServiceName == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(hc.ServiceName))
return b.Bytes(), nil
}
// gatewayServicesTableSchema returns a new table schema used to store information // gatewayServicesTableSchema returns a new table schema used to store information
// about services associated with terminating gateways. // about services associated with terminating gateways.
func gatewayServicesTableSchema() *memdb.TableSchema { func gatewayServicesTableSchema() *memdb.TableSchema {

@ -1316,7 +1316,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
} }
// Indexes were updated. // Indexes were updated.
for _, tbl := range []string{"nodes", tableServices, "checks"} { for _, tbl := range []string{"nodes", tableServices, tableChecks} {
if idx := s.maxIndex(tbl); idx != 3 { if idx := s.maxIndex(tbl); idx != 3 {
t.Fatalf("bad index: %d (%s)", idx, tbl) t.Fatalf("bad index: %d (%s)", idx, tbl)
} }
@ -2076,7 +2076,7 @@ func TestStateStore_DeleteService(t *testing.T) {
if idx := s.maxIndex(tableServices); idx != 4 { if idx := s.maxIndex(tableServices); idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if idx := s.maxIndex("checks"); idx != 4 { if idx := s.maxIndex(tableChecks); idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -2411,7 +2411,7 @@ func TestStateStore_EnsureCheck(t *testing.T) {
testCheckOutput(t, 5, 5, "bbbmodified") testCheckOutput(t, 5, 5, "bbbmodified")
// Index tables were updated // Index tables were updated
if idx := s.maxIndex("checks"); idx != 5 { if idx := s.maxIndex(tableChecks); idx != 5 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
} }
@ -2894,7 +2894,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
if idx, check, err := s.NodeCheck("node1", "check1", nil); idx != 3 || err != nil || check != nil { if idx, check, err := s.NodeCheck("node1", "check1", nil); idx != 3 || err != nil || check != nil {
t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err) t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err)
} }
if idx := s.maxIndex("checks"); idx != 3 { if idx := s.maxIndex(tableChecks); idx != 3 {
t.Fatalf("bad index for checks: %d", idx) t.Fatalf("bad index for checks: %d", idx)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -2914,7 +2914,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
} }
// Index tables were updated. // Index tables were updated.
if idx := s.maxIndex("checks"); idx != 3 { if idx := s.maxIndex(tableChecks); idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -2923,7 +2923,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
if err := s.DeleteCheck(4, "node1", "check1", nil); err != nil { if err := s.DeleteCheck(4, "node1", "check1", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx := s.maxIndex("checks"); idx != 3 { if idx := s.maxIndex(tableChecks); idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if watchFired(ws) { if watchFired(ws) {