state: move services.ID to new pattern

This commit is contained in:
Daniel Nephin 2021-02-12 13:39:38 -05:00
parent 199a0b0c19
commit d90845f26d
7 changed files with 44 additions and 25 deletions

View File

@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
// node info above to make sure we actually need to update the service
// definition in order to prevent useless churn if nothing has changed.
if req.Service != nil {
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &req.Service.EnterpriseMeta, req.Node, req.Service.ID)
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
// Returns an error if the write didn't happen and nil if write was successful.
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
// Retrieve the existing service.
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node
// existing memdb transaction.
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
// Check for existing service
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1142,8 +1142,13 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
}
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
// TODO: pass non-pointer type for ent meta
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Query the service
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
if err != nil {
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
@ -1311,8 +1316,13 @@ func (s *Store) deleteServiceCASTxn(tx WriteTxn, idx, cidx uint64, nodeName, ser
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error {
// TODO: pass non-pointer type for ent meta
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Look up the service.
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1493,7 +1503,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", &hc.EnterpriseMeta, hc.Node, hc.ServiceID)
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1806,7 +1816,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
return err
}
_, svcRaw, err := firstWatchCompoundWithTxn(tx, "services", "id", &existing.EnterpriseMeta, existing.Node, existing.ServiceID)
_, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
if err != nil {
return fmt.Errorf("failed retrieving service from state store: %v", err)
}
@ -2293,7 +2303,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
// We don't want to track an unlimited number of services, so we pull a
// top-level watch to use as a fallback.
allServices, err := tx.Get("services", "id")
allServices, err := tx.Get(tableServices, indexID)
if err != nil {
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
}

View File

@ -95,6 +95,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
}
}
// TODO: this could use NodeServiceQuery
type nodeServiceTuple struct {
Node string
ServiceID string
@ -569,7 +570,7 @@ func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTu
return stream.Event{}, err
}
svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID)
svc, err := tx.Get(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: tuple.EntMeta, Node: tuple.Node, Service: tuple.ServiceID})
if err != nil {
return stream.Event{}, err
}

View File

@ -87,6 +87,22 @@ func indexFromNodeIdentity(raw interface{}) ([]byte, error) {
return b.Bytes(), nil
}
func indexFromServiceNode(raw interface{}) ([]byte, error) {
n, ok := raw.(*structs.ServiceNode)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
}
if n.Node == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(n.Node))
b.String(strings.ToLower(n.ServiceID))
return b.Bytes(), nil
}
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
return fmt.Sprintf("service.%s", name)
}
@ -169,7 +185,7 @@ func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.Serv
}
func catalogServiceList(tx ReadTxn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
return tx.Get("services", "id")
return tx.Get(tableServices, indexID)
}
func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {

View File

@ -72,17 +72,9 @@ func servicesTableSchema() *memdb.TableSchema {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "ServiceID",
Lowercase: true,
},
},
Indexer: indexerSingle{
readIndex: readIndex(indexFromNodeServiceQuery),
writeIndex: writeIndex(indexFromServiceNode),
},
},
indexNode: {

View File

@ -1298,7 +1298,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
// the DB to make sure it is actually gone.
tx := s.db.Txn(false)
defer tx.Abort()
services, err := getCompoundWithTxn(tx, "services", "id", nil, "node1", "service1")
services, err := tx.Get(tableServices, indexID, NodeServiceQuery{Node: "node1", Service: "service1"})
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s
tx := s.db.Txn(false)
defer tx.Abort()
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv
tx := s.db.Txn(false)
defer tx.Abort()
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", nil, nodeID, serviceID)
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -150,7 +150,7 @@ table=services
index=connect allow-missing
indexer=github.com/hashicorp/consul/agent/consul/state.IndexConnectService
index=id unique
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceID Lowercase=true] AllowMissing=false
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromServiceNode
index=kind
indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind
index=node