state: use tx.First instead of tx.FirstWatch

Where appropriate. After removing the helper function a bunch of  these calls can
be changed to tx.First.
This commit is contained in:
Daniel Nephin 2021-03-22 17:09:22 -04:00
parent 1cdcfb8260
commit 30281a5332
3 changed files with 15 additions and 16 deletions

View File

@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
// node info above to make sure we actually need to update the service
// definition in order to prevent useless churn if nothing has changed.
if req.Service != nil {
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
// Look up the existing node's Serf health check to see if it's failed.
// If it is, the node can be renamed.
_, enodeCheck, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
if err != nil {
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
}
@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
// Returns an error if the write didn't happen and nil if write was successful.
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
// Retrieve the existing service.
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node
// existing memdb transaction.
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
// Check for existing service
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1148,7 +1148,7 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
}
// Query the service
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
if err != nil {
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
@ -1321,8 +1321,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
entMeta = structs.DefaultEnterpriseMeta()
}
// Look up the service.
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1471,7 +1470,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe
// checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
// Check if we have an existing health check
_, existing, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
existing, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
if err != nil {
return fmt.Errorf("failed health check lookup: %s", err)
}
@ -1503,7 +1502,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -1584,7 +1583,7 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta
}
// Return the check.
_, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)})
check, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)})
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
@ -1810,7 +1809,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
}
// Try to retrieve the existing health check.
_, hc, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)})
hc, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)})
if err != nil {
return fmt.Errorf("check lookup failed: %s", err)
}
@ -1825,7 +1824,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
return err
}
_, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
svcRaw, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
if err != nil {
return fmt.Errorf("failed retrieving service from state store: %v", err)
}

View File

@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) {
// that it actually is removed in the state store.
tx := s.db.Txn(false)
defer tx.Abort()
_, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"})
check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"})
if err != nil || check != nil {
t.Fatalf("bad: %#v (err: %s)", check, err)
}

View File

@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s
tx := s.db.Txn(false)
defer tx.Abort()
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv
tx := s.db.Txn(false)
defer tx.Abort()
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64,
tx := s.db.Txn(false)
defer tx.Abort()
_, c, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)})
c, err := tx.First(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)})
if err != nil {
t.Fatalf("err: %s", err)
}