Rename txnWrapper to txn

This commit is contained in:
Daniel Nephin 2020-06-03 13:21:00 -04:00
parent 32aa3ada35
commit f7c84ad802
22 changed files with 235 additions and 235 deletions

View File

@ -315,10 +315,10 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
// CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index // CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index
func (s *Store) CanBootstrapACLToken() (bool, uint64, error) { func (s *Store) CanBootstrapACLToken() (bool, uint64, error) {
txn := s.db.Txn(false) tx := s.db.Txn(false)
// Lookup the bootstrap sentinel // Lookup the bootstrap sentinel
out, err := txn.First("index", "id", "acl-token-bootstrap") out, err := tx.First("index", "id", "acl-token-bootstrap")
if err != nil { if err != nil {
return false, 0, err return false, 0, err
} }
@ -339,7 +339,7 @@ func (s *Store) CanBootstrapACLToken() (bool, uint64, error) {
// to update the name. Unlike the older functions to operate specifically on role or policy links // to update the name. Unlike the older functions to operate specifically on role or policy links
// this function does not itself handle the case where the id cannot be found. Instead the // this function does not itself handle the case where the id cannot be found. Instead the
// getName function should handle that and return an error if necessary // getName function should handle that and return an error if necessary
func (s *Store) resolveACLLinks(tx *txnWrapper, links []agentpb.ACLLink, getName func(*txnWrapper, string) (string, error)) (int, error) { func (s *Store) resolveACLLinks(tx *txn, links []agentpb.ACLLink, getName func(*txn, string) (string, error)) (int, error) {
var numValid int var numValid int
for linkIndex, link := range links { for linkIndex, link := range links {
if link.ID != "" { if link.ID != "" {
@ -365,7 +365,7 @@ func (s *Store) resolveACLLinks(tx *txnWrapper, links []agentpb.ACLLink, getName
// associated with the ID of the link. Ideally this will be a no-op if the names are already correct // associated with the ID of the link. Ideally this will be a no-op if the names are already correct
// however if a linked resource was renamed it might be stale. This function will treat the incoming // however if a linked resource was renamed it might be stale. This function will treat the incoming
// links with copy-on-write semantics and its output will indicate whether any modifications were made. // links with copy-on-write semantics and its output will indicate whether any modifications were made.
func (s *Store) fixupACLLinks(tx *txnWrapper, original []agentpb.ACLLink, getName func(*txnWrapper, string) (string, error)) ([]agentpb.ACLLink, bool, error) { func (s *Store) fixupACLLinks(tx *txn, original []agentpb.ACLLink, getName func(*txn, string) (string, error)) ([]agentpb.ACLLink, bool, error) {
owned := false owned := false
links := original links := original
@ -405,7 +405,7 @@ func (s *Store) fixupACLLinks(tx *txnWrapper, original []agentpb.ACLLink, getNam
return links, owned, nil return links, owned, nil
} }
func (s *Store) resolveTokenPolicyLinks(tx *txnWrapper, token *structs.ACLToken, allowMissing bool) (int, error) { func (s *Store) resolveTokenPolicyLinks(tx *txn, token *structs.ACLToken, allowMissing bool) (int, error) {
var numValid int var numValid int
for linkIndex, link := range token.Policies { for linkIndex, link := range token.Policies {
if link.ID != "" { if link.ID != "" {
@ -433,7 +433,7 @@ func (s *Store) resolveTokenPolicyLinks(tx *txnWrapper, token *structs.ACLToken,
// stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated // stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated
// token only when fixes are needed. If the policy links are still accurate then we just return the original // token only when fixes are needed. If the policy links are still accurate then we just return the original
// token. // token.
func (s *Store) fixupTokenPolicyLinks(tx *txnWrapper, original *structs.ACLToken) (*structs.ACLToken, error) { func (s *Store) fixupTokenPolicyLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) {
owned := false owned := false
token := original token := original
@ -479,7 +479,7 @@ func (s *Store) fixupTokenPolicyLinks(tx *txnWrapper, original *structs.ACLToken
return token, nil return token, nil
} }
func (s *Store) resolveTokenRoleLinks(tx *txnWrapper, token *structs.ACLToken, allowMissing bool) (int, error) { func (s *Store) resolveTokenRoleLinks(tx *txn, token *structs.ACLToken, allowMissing bool) (int, error) {
var numValid int var numValid int
for linkIndex, link := range token.Roles { for linkIndex, link := range token.Roles {
if link.ID != "" { if link.ID != "" {
@ -507,7 +507,7 @@ func (s *Store) resolveTokenRoleLinks(tx *txnWrapper, token *structs.ACLToken, a
// stale when a linked role was deleted or renamed. This will correct them and generate a newly allocated // stale when a linked role was deleted or renamed. This will correct them and generate a newly allocated
// token only when fixes are needed. If the role links are still accurate then we just return the original // token only when fixes are needed. If the role links are still accurate then we just return the original
// token. // token.
func (s *Store) fixupTokenRoleLinks(tx *txnWrapper, original *structs.ACLToken) (*structs.ACLToken, error) { func (s *Store) fixupTokenRoleLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) {
owned := false owned := false
token := original token := original
@ -553,7 +553,7 @@ func (s *Store) fixupTokenRoleLinks(tx *txnWrapper, original *structs.ACLToken)
return token, nil return token, nil
} }
func (s *Store) resolveRolePolicyLinks(tx *txnWrapper, role *structs.ACLRole, allowMissing bool) error { func (s *Store) resolveRolePolicyLinks(tx *txn, role *structs.ACLRole, allowMissing bool) error {
for linkIndex, link := range role.Policies { for linkIndex, link := range role.Policies {
if link.ID != "" { if link.ID != "" {
policy, err := s.getPolicyWithTxn(tx, nil, link.ID, s.aclPolicyGetByID, &role.EnterpriseMeta) policy, err := s.getPolicyWithTxn(tx, nil, link.ID, s.aclPolicyGetByID, &role.EnterpriseMeta)
@ -579,7 +579,7 @@ func (s *Store) resolveRolePolicyLinks(tx *txnWrapper, role *structs.ACLRole, al
// stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated // stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated
// role only when fixes are needed. If the policy links are still accurate then we just return the original // role only when fixes are needed. If the policy links are still accurate then we just return the original
// role. // role.
func (s *Store) fixupRolePolicyLinks(tx *txnWrapper, original *structs.ACLRole) (*structs.ACLRole, error) { func (s *Store) fixupRolePolicyLinks(tx *txn, original *structs.ACLRole) (*structs.ACLRole, error) {
owned := false owned := false
role := original role := original
@ -653,7 +653,7 @@ func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas, allo
// aclTokenSetTxn is the inner method used to insert an ACL token with the // aclTokenSetTxn is the inner method used to insert an ACL token with the
// proper indexes into the state store. // proper indexes into the state store.
func (s *Store) aclTokenSetTxn(tx *txnWrapper, idx uint64, token *structs.ACLToken, cas, allowMissingPolicyAndRoleIDs, prohibitUnprivileged, legacy bool) error { func (s *Store) aclTokenSetTxn(tx *txn, idx uint64, token *structs.ACLToken, cas, allowMissingPolicyAndRoleIDs, prohibitUnprivileged, legacy bool) error {
// Check that the ID is set // Check that the ID is set
if token.SecretID == "" { if token.SecretID == "" {
return ErrMissingACLTokenSecret return ErrMissingACLTokenSecret
@ -823,7 +823,7 @@ func (s *Store) ACLTokenBatchGet(ws memdb.WatchSet, accessors []string) (uint64,
return idx, tokens, nil return idx, tokens, nil
} }
func (s *Store) aclTokenGetTxn(tx *txnWrapper, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) { func (s *Store) aclTokenGetTxn(tx *txn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) {
watchCh, rawToken, err := s.aclTokenGetFromIndex(tx, value, index, entMeta) watchCh, rawToken, err := s.aclTokenGetFromIndex(tx, value, index, entMeta)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed acl token lookup: %v", err) return nil, fmt.Errorf("failed acl token lookup: %v", err)
@ -1041,7 +1041,7 @@ func (s *Store) aclTokenDelete(idx uint64, value, index string, entMeta *structs
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclTokenDeleteTxn(tx *txnWrapper, idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error { func (s *Store) aclTokenDeleteTxn(tx *txn, idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error {
// Look up the existing token // Look up the existing token
_, token, err := s.aclTokenGetFromIndex(tx, value, index, entMeta) _, token, err := s.aclTokenGetFromIndex(tx, value, index, entMeta)
if err != nil { if err != nil {
@ -1059,7 +1059,7 @@ func (s *Store) aclTokenDeleteTxn(tx *txnWrapper, idx uint64, value, index strin
return s.aclTokenDeleteWithToken(tx, token.(*structs.ACLToken), idx) return s.aclTokenDeleteWithToken(tx, token.(*structs.ACLToken), idx)
} }
func (s *Store) aclTokenDeleteAllForAuthMethodTxn(tx *txnWrapper, idx uint64, methodName string, methodMeta *structs.EnterpriseMeta) error { func (s *Store) aclTokenDeleteAllForAuthMethodTxn(tx *txn, idx uint64, methodName string, methodMeta *structs.EnterpriseMeta) error {
// collect all the tokens linked with the given auth method. // collect all the tokens linked with the given auth method.
iter, err := s.aclTokenListByAuthMethod(tx, methodName, methodMeta, structs.WildcardEnterpriseMeta()) iter, err := s.aclTokenListByAuthMethod(tx, methodName, methodMeta, structs.WildcardEnterpriseMeta())
if err != nil { if err != nil {
@ -1108,7 +1108,7 @@ func (s *Store) ACLPolicySet(idx uint64, policy *structs.ACLPolicy) error {
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclPolicySetTxn(tx *txnWrapper, idx uint64, policy *structs.ACLPolicy) error { func (s *Store) aclPolicySetTxn(tx *txn, idx uint64, policy *structs.ACLPolicy) error {
// Check that the ID is set // Check that the ID is set
if policy.ID == "" { if policy.ID == "" {
return ErrMissingACLPolicyID return ErrMissingACLPolicyID
@ -1202,9 +1202,9 @@ func (s *Store) ACLPolicyBatchGet(ws memdb.WatchSet, ids []string) (uint64, stru
return idx, policies, nil return idx, policies, nil
} }
type aclPolicyGetFn func(*txnWrapper, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) type aclPolicyGetFn func(*txn, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error)
func (s *Store) getPolicyWithTxn(tx *txnWrapper, ws memdb.WatchSet, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLPolicy, error) { func (s *Store) getPolicyWithTxn(tx *txn, ws memdb.WatchSet, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLPolicy, error) {
watchCh, policy, err := fn(tx, value, entMeta) watchCh, policy, err := fn(tx, value, entMeta)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed acl policy lookup: %v", err) return nil, fmt.Errorf("failed acl policy lookup: %v", err)
@ -1284,7 +1284,7 @@ func (s *Store) aclPolicyDelete(idx uint64, value string, fn aclPolicyGetFn, ent
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclPolicyDeleteTxn(tx *txnWrapper, idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error { func (s *Store) aclPolicyDeleteTxn(tx *txn, idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error {
// Look up the existing token // Look up the existing token
_, rawPolicy, err := fn(tx, value, entMeta) _, rawPolicy, err := fn(tx, value, entMeta)
if err != nil { if err != nil {
@ -1328,7 +1328,7 @@ func (s *Store) ACLRoleSet(idx uint64, role *structs.ACLRole) error {
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclRoleSetTxn(tx *txnWrapper, idx uint64, role *structs.ACLRole, allowMissing bool) error { func (s *Store) aclRoleSetTxn(tx *txn, idx uint64, role *structs.ACLRole, allowMissing bool) error {
// Check that the ID is set // Check that the ID is set
if role.ID == "" { if role.ID == "" {
return ErrMissingACLRoleID return ErrMissingACLRoleID
@ -1392,7 +1392,7 @@ func (s *Store) aclRoleSetTxn(tx *txnWrapper, idx uint64, role *structs.ACLRole,
return s.aclRoleInsert(tx, role) return s.aclRoleInsert(tx, role)
} }
type aclRoleGetFn func(*txnWrapper, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) type aclRoleGetFn func(*txn, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error)
func (s *Store) ACLRoleGetByID(ws memdb.WatchSet, id string, entMeta *structs.EnterpriseMeta) (uint64, *structs.ACLRole, error) { func (s *Store) ACLRoleGetByID(ws memdb.WatchSet, id string, entMeta *structs.EnterpriseMeta) (uint64, *structs.ACLRole, error) {
return s.aclRoleGet(ws, id, s.aclRoleGetByID, entMeta) return s.aclRoleGet(ws, id, s.aclRoleGetByID, entMeta)
@ -1423,7 +1423,7 @@ func (s *Store) ACLRoleBatchGet(ws memdb.WatchSet, ids []string) (uint64, struct
return idx, roles, nil return idx, roles, nil
} }
func (s *Store) getRoleWithTxn(tx *txnWrapper, ws memdb.WatchSet, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLRole, error) { func (s *Store) getRoleWithTxn(tx *txn, ws memdb.WatchSet, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLRole, error) {
watchCh, rawRole, err := fn(tx, value, entMeta) watchCh, rawRole, err := fn(tx, value, entMeta)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed acl role lookup: %v", err) return nil, fmt.Errorf("failed acl role lookup: %v", err)
@ -1521,7 +1521,7 @@ func (s *Store) aclRoleDelete(idx uint64, value string, fn aclRoleGetFn, entMeta
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclRoleDeleteTxn(tx *txnWrapper, idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error { func (s *Store) aclRoleDeleteTxn(tx *txn, idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error {
// Look up the existing role // Look up the existing role
_, rawRole, err := fn(tx, value, entMeta) _, rawRole, err := fn(tx, value, entMeta)
if err != nil { if err != nil {
@ -1560,7 +1560,7 @@ func (s *Store) ACLBindingRuleSet(idx uint64, rule *structs.ACLBindingRule) erro
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclBindingRuleSetTxn(tx *txnWrapper, idx uint64, rule *structs.ACLBindingRule) error { func (s *Store) aclBindingRuleSetTxn(tx *txn, idx uint64, rule *structs.ACLBindingRule) error {
// Check that the ID and AuthMethod are set // Check that the ID and AuthMethod are set
if rule.ID == "" { if rule.ID == "" {
return ErrMissingACLBindingRuleID return ErrMissingACLBindingRuleID
@ -1676,7 +1676,7 @@ func (s *Store) aclBindingRuleDelete(idx uint64, id string, entMeta *structs.Ent
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclBindingRuleDeleteTxn(tx *txnWrapper, idx uint64, id string, entMeta *structs.EnterpriseMeta) error { func (s *Store) aclBindingRuleDeleteTxn(tx *txn, idx uint64, id string, entMeta *structs.EnterpriseMeta) error {
// Look up the existing binding rule // Look up the existing binding rule
_, rawRule, err := s.aclBindingRuleGetByID(tx, id, entMeta) _, rawRule, err := s.aclBindingRuleGetByID(tx, id, entMeta)
if err != nil { if err != nil {
@ -1695,7 +1695,7 @@ func (s *Store) aclBindingRuleDeleteTxn(tx *txnWrapper, idx uint64, id string, e
return nil return nil
} }
func (s *Store) aclBindingRuleDeleteAllForAuthMethodTxn(tx *txnWrapper, idx uint64, methodName string, entMeta *structs.EnterpriseMeta) error { func (s *Store) aclBindingRuleDeleteAllForAuthMethodTxn(tx *txn, idx uint64, methodName string, entMeta *structs.EnterpriseMeta) error {
// collect them all // collect them all
iter, err := s.aclBindingRuleListByAuthMethod(tx, methodName, entMeta) iter, err := s.aclBindingRuleListByAuthMethod(tx, methodName, entMeta)
if err != nil { if err != nil {
@ -1745,7 +1745,7 @@ func (s *Store) ACLAuthMethodSet(idx uint64, method *structs.ACLAuthMethod) erro
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclAuthMethodSetTxn(tx *txnWrapper, idx uint64, method *structs.ACLAuthMethod) error { func (s *Store) aclAuthMethodSetTxn(tx *txn, idx uint64, method *structs.ACLAuthMethod) error {
// Check that the Name and Type are set // Check that the Name and Type are set
if method.Name == "" { if method.Name == "" {
return ErrMissingACLAuthMethodName return ErrMissingACLAuthMethodName
@ -1794,7 +1794,7 @@ func (s *Store) aclAuthMethodGet(ws memdb.WatchSet, name string, entMeta *struct
return idx, method, nil return idx, method, nil
} }
func (s *Store) getAuthMethodWithTxn(tx *txnWrapper, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) { func (s *Store) getAuthMethodWithTxn(tx *txn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) {
watchCh, rawMethod, err := s.aclAuthMethodGetByName(tx, name, entMeta) watchCh, rawMethod, err := s.aclAuthMethodGetByName(tx, name, entMeta)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed acl auth method lookup: %v", err) return nil, fmt.Errorf("failed acl auth method lookup: %v", err)
@ -1860,7 +1860,7 @@ func (s *Store) aclAuthMethodDelete(idx uint64, name string, entMeta *structs.En
return tx.Commit() return tx.Commit()
} }
func (s *Store) aclAuthMethodDeleteTxn(tx *txnWrapper, idx uint64, name string, entMeta *structs.EnterpriseMeta) error { func (s *Store) aclAuthMethodDeleteTxn(tx *txn, idx uint64, name string, entMeta *structs.EnterpriseMeta) error {
// Look up the existing method // Look up the existing method
_, rawMethod, err := s.aclAuthMethodGetByName(tx, name, entMeta) _, rawMethod, err := s.aclAuthMethodGetByName(tx, name, entMeta)
if err != nil { if err != nil {

View File

@ -206,7 +206,7 @@ func authMethodsTableSchema() *memdb.TableSchema {
///// ACL Policy Functions ///// ///// ACL Policy Functions /////
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
func (s *Store) aclPolicyInsert(tx *txnWrapper, policy *structs.ACLPolicy) error { func (s *Store) aclPolicyInsert(tx *txn, policy *structs.ACLPolicy) error {
if err := tx.Insert("acl-policies", policy); err != nil { if err := tx.Insert("acl-policies", policy); err != nil {
return fmt.Errorf("failed inserting acl policy: %v", err) return fmt.Errorf("failed inserting acl policy: %v", err)
} }
@ -218,19 +218,19 @@ func (s *Store) aclPolicyInsert(tx *txnWrapper, policy *structs.ACLPolicy) error
return nil return nil
} }
func (s *Store) aclPolicyGetByID(tx *txnWrapper, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclPolicyGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-policies", "id", id) return tx.FirstWatch("acl-policies", "id", id)
} }
func (s *Store) aclPolicyGetByName(tx *txnWrapper, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclPolicyGetByName(tx *txn, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-policies", "name", name) return tx.FirstWatch("acl-policies", "name", name)
} }
func (s *Store) aclPolicyList(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclPolicyList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-policies", "id") return tx.Get("acl-policies", "id")
} }
func (s *Store) aclPolicyDeleteWithPolicy(tx *txnWrapper, policy *structs.ACLPolicy, idx uint64) error { func (s *Store) aclPolicyDeleteWithPolicy(tx *txn, policy *structs.ACLPolicy, idx uint64) error {
// remove the policy // remove the policy
if err := tx.Delete("acl-policies", policy); err != nil { if err := tx.Delete("acl-policies", policy); err != nil {
return fmt.Errorf("failed deleting acl policy: %v", err) return fmt.Errorf("failed deleting acl policy: %v", err)
@ -243,11 +243,11 @@ func (s *Store) aclPolicyDeleteWithPolicy(tx *txnWrapper, policy *structs.ACLPol
return nil return nil
} }
func (s *Store) aclPolicyMaxIndex(tx *txnWrapper, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 { func (s *Store) aclPolicyMaxIndex(tx *txn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-policies") return maxIndexTxn(tx, "acl-policies")
} }
func (s *Store) aclPolicyUpsertValidateEnterprise(*txnWrapper, *structs.ACLPolicy, *structs.ACLPolicy) error { func (s *Store) aclPolicyUpsertValidateEnterprise(*txn, *structs.ACLPolicy, *structs.ACLPolicy) error {
return nil return nil
} }
@ -259,7 +259,7 @@ func (s *Store) ACLPolicyUpsertValidateEnterprise(*structs.ACLPolicy, *structs.A
///// ACL Token Functions ///// ///// ACL Token Functions /////
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
func (s *Store) aclTokenInsert(tx *txnWrapper, token *structs.ACLToken) error { func (s *Store) aclTokenInsert(tx *txn, token *structs.ACLToken) error {
// insert the token into memdb // insert the token into memdb
if err := tx.Insert("acl-tokens", token); err != nil { if err := tx.Insert("acl-tokens", token); err != nil {
return fmt.Errorf("failed inserting acl token: %v", err) return fmt.Errorf("failed inserting acl token: %v", err)
@ -273,35 +273,35 @@ func (s *Store) aclTokenInsert(tx *txnWrapper, token *structs.ACLToken) error {
return nil return nil
} }
func (s *Store) aclTokenGetFromIndex(tx *txnWrapper, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclTokenGetFromIndex(tx *txn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-tokens", index, id) return tx.FirstWatch("acl-tokens", index, id)
} }
func (s *Store) aclTokenListAll(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclTokenListAll(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "id") return tx.Get("acl-tokens", "id")
} }
func (s *Store) aclTokenListLocal(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclTokenListLocal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "local", true) return tx.Get("acl-tokens", "local", true)
} }
func (s *Store) aclTokenListGlobal(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "local", false) return tx.Get("acl-tokens", "local", false)
} }
func (s *Store) aclTokenListByPolicy(tx *txnWrapper, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclTokenListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "policies", policy) return tx.Get("acl-tokens", "policies", policy)
} }
func (s *Store) aclTokenListByRole(tx *txnWrapper, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclTokenListByRole(tx *txn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "roles", role) return tx.Get("acl-tokens", "roles", role)
} }
func (s *Store) aclTokenListByAuthMethod(tx *txnWrapper, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclTokenListByAuthMethod(tx *txn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "authmethod", authMethod) return tx.Get("acl-tokens", "authmethod", authMethod)
} }
func (s *Store) aclTokenDeleteWithToken(tx *txnWrapper, token *structs.ACLToken, idx uint64) error { func (s *Store) aclTokenDeleteWithToken(tx *txn, token *structs.ACLToken, idx uint64) error {
// remove the token // remove the token
if err := tx.Delete("acl-tokens", token); err != nil { if err := tx.Delete("acl-tokens", token); err != nil {
return fmt.Errorf("failed deleting acl token: %v", err) return fmt.Errorf("failed deleting acl token: %v", err)
@ -314,11 +314,11 @@ func (s *Store) aclTokenDeleteWithToken(tx *txnWrapper, token *structs.ACLToken,
return nil return nil
} }
func (s *Store) aclTokenMaxIndex(tx *txnWrapper, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 { func (s *Store) aclTokenMaxIndex(tx *txn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-tokens") return maxIndexTxn(tx, "acl-tokens")
} }
func (s *Store) aclTokenUpsertValidateEnterprise(tx *txnWrapper, token *structs.ACLToken, existing *structs.ACLToken) error { func (s *Store) aclTokenUpsertValidateEnterprise(tx *txn, token *structs.ACLToken, existing *structs.ACLToken) error {
return nil return nil
} }
@ -330,7 +330,7 @@ func (s *Store) ACLTokenUpsertValidateEnterprise(token *structs.ACLToken, existi
///// ACL Role Functions ///// ///// ACL Role Functions /////
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
func (s *Store) aclRoleInsert(tx *txnWrapper, role *structs.ACLRole) error { func (s *Store) aclRoleInsert(tx *txn, role *structs.ACLRole) error {
// insert the role into memdb // insert the role into memdb
if err := tx.Insert("acl-roles", role); err != nil { if err := tx.Insert("acl-roles", role); err != nil {
return fmt.Errorf("failed inserting acl role: %v", err) return fmt.Errorf("failed inserting acl role: %v", err)
@ -343,23 +343,23 @@ func (s *Store) aclRoleInsert(tx *txnWrapper, role *structs.ACLRole) error {
return nil return nil
} }
func (s *Store) aclRoleGetByID(tx *txnWrapper, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclRoleGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-roles", "id", id) return tx.FirstWatch("acl-roles", "id", id)
} }
func (s *Store) aclRoleGetByName(tx *txnWrapper, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclRoleGetByName(tx *txn, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-roles", "name", name) return tx.FirstWatch("acl-roles", "name", name)
} }
func (s *Store) aclRoleList(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-roles", "id") return tx.Get("acl-roles", "id")
} }
func (s *Store) aclRoleListByPolicy(tx *txnWrapper, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclRoleListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-roles", "policies", policy) return tx.Get("acl-roles", "policies", policy)
} }
func (s *Store) aclRoleDeleteWithRole(tx *txnWrapper, role *structs.ACLRole, idx uint64) error { func (s *Store) aclRoleDeleteWithRole(tx *txn, role *structs.ACLRole, idx uint64) error {
// remove the role // remove the role
if err := tx.Delete("acl-roles", role); err != nil { if err := tx.Delete("acl-roles", role); err != nil {
return fmt.Errorf("failed deleting acl role: %v", err) return fmt.Errorf("failed deleting acl role: %v", err)
@ -372,11 +372,11 @@ func (s *Store) aclRoleDeleteWithRole(tx *txnWrapper, role *structs.ACLRole, idx
return nil return nil
} }
func (s *Store) aclRoleMaxIndex(tx *txnWrapper, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 { func (s *Store) aclRoleMaxIndex(tx *txn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-roles") return maxIndexTxn(tx, "acl-roles")
} }
func (s *Store) aclRoleUpsertValidateEnterprise(tx *txnWrapper, role *structs.ACLRole, existing *structs.ACLRole) error { func (s *Store) aclRoleUpsertValidateEnterprise(tx *txn, role *structs.ACLRole, existing *structs.ACLRole) error {
return nil return nil
} }
@ -388,7 +388,7 @@ func (s *Store) ACLRoleUpsertValidateEnterprise(role *structs.ACLRole, existing
///// ACL Binding Rule Functions ///// ///// ACL Binding Rule Functions /////
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
func (s *Store) aclBindingRuleInsert(tx *txnWrapper, rule *structs.ACLBindingRule) error { func (s *Store) aclBindingRuleInsert(tx *txn, rule *structs.ACLBindingRule) error {
// insert the role into memdb // insert the role into memdb
if err := tx.Insert("acl-binding-rules", rule); err != nil { if err := tx.Insert("acl-binding-rules", rule); err != nil {
return fmt.Errorf("failed inserting acl role: %v", err) return fmt.Errorf("failed inserting acl role: %v", err)
@ -402,19 +402,19 @@ func (s *Store) aclBindingRuleInsert(tx *txnWrapper, rule *structs.ACLBindingRul
return nil return nil
} }
func (s *Store) aclBindingRuleGetByID(tx *txnWrapper, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclBindingRuleGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-binding-rules", "id", id) return tx.FirstWatch("acl-binding-rules", "id", id)
} }
func (s *Store) aclBindingRuleList(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclBindingRuleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-binding-rules", "id") return tx.Get("acl-binding-rules", "id")
} }
func (s *Store) aclBindingRuleListByAuthMethod(tx *txnWrapper, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclBindingRuleListByAuthMethod(tx *txn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-binding-rules", "authmethod", method) return tx.Get("acl-binding-rules", "authmethod", method)
} }
func (s *Store) aclBindingRuleDeleteWithRule(tx *txnWrapper, rule *structs.ACLBindingRule, idx uint64) error { func (s *Store) aclBindingRuleDeleteWithRule(tx *txn, rule *structs.ACLBindingRule, idx uint64) error {
// remove the rule // remove the rule
if err := tx.Delete("acl-binding-rules", rule); err != nil { if err := tx.Delete("acl-binding-rules", rule); err != nil {
return fmt.Errorf("failed deleting acl binding rule: %v", err) return fmt.Errorf("failed deleting acl binding rule: %v", err)
@ -427,11 +427,11 @@ func (s *Store) aclBindingRuleDeleteWithRule(tx *txnWrapper, rule *structs.ACLBi
return nil return nil
} }
func (s *Store) aclBindingRuleMaxIndex(tx *txnWrapper, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 { func (s *Store) aclBindingRuleMaxIndex(tx *txn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-binding-rules") return maxIndexTxn(tx, "acl-binding-rules")
} }
func (s *Store) aclBindingRuleUpsertValidateEnterprise(tx *txnWrapper, rule *structs.ACLBindingRule, existing *structs.ACLBindingRule) error { func (s *Store) aclBindingRuleUpsertValidateEnterprise(tx *txn, rule *structs.ACLBindingRule, existing *structs.ACLBindingRule) error {
return nil return nil
} }
@ -443,7 +443,7 @@ func (s *Store) ACLBindingRuleUpsertValidateEnterprise(rule *structs.ACLBindingR
///// ACL Auth Method Functions ///// ///// ACL Auth Method Functions /////
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
func (s *Store) aclAuthMethodInsert(tx *txnWrapper, method *structs.ACLAuthMethod) error { func (s *Store) aclAuthMethodInsert(tx *txn, method *structs.ACLAuthMethod) error {
// insert the role into memdb // insert the role into memdb
if err := tx.Insert("acl-auth-methods", method); err != nil { if err := tx.Insert("acl-auth-methods", method); err != nil {
return fmt.Errorf("failed inserting acl role: %v", err) return fmt.Errorf("failed inserting acl role: %v", err)
@ -457,15 +457,15 @@ func (s *Store) aclAuthMethodInsert(tx *txnWrapper, method *structs.ACLAuthMetho
return nil return nil
} }
func (s *Store) aclAuthMethodGetByName(tx *txnWrapper, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) aclAuthMethodGetByName(tx *txn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-auth-methods", "id", method) return tx.FirstWatch("acl-auth-methods", "id", method)
} }
func (s *Store) aclAuthMethodList(tx *txnWrapper, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) aclAuthMethodList(tx *txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-auth-methods", "id") return tx.Get("acl-auth-methods", "id")
} }
func (s *Store) aclAuthMethodDeleteWithMethod(tx *txnWrapper, method *structs.ACLAuthMethod, idx uint64) error { func (s *Store) aclAuthMethodDeleteWithMethod(tx *txn, method *structs.ACLAuthMethod, idx uint64) error {
// remove the method // remove the method
if err := tx.Delete("acl-auth-methods", method); err != nil { if err := tx.Delete("acl-auth-methods", method); err != nil {
return fmt.Errorf("failed deleting acl auth method: %v", err) return fmt.Errorf("failed deleting acl auth method: %v", err)
@ -478,11 +478,11 @@ func (s *Store) aclAuthMethodDeleteWithMethod(tx *txnWrapper, method *structs.AC
return nil return nil
} }
func (s *Store) aclAuthMethodMaxIndex(tx *txnWrapper, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 { func (s *Store) aclAuthMethodMaxIndex(tx *txn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-auth-methods") return maxIndexTxn(tx, "acl-auth-methods")
} }
func (s *Store) aclAuthMethodUpsertValidateEnterprise(tx *txnWrapper, method *structs.ACLAuthMethod, existing *structs.ACLAuthMethod) error { func (s *Store) aclAuthMethodUpsertValidateEnterprise(tx *txn, method *structs.ACLAuthMethod, existing *structs.ACLAuthMethod) error {
return nil return nil
} }

View File

@ -4105,7 +4105,7 @@ func TestStateStore_resolveACLLinks(t *testing.T) {
}, },
} }
_, err := s.resolveACLLinks(tx, links, func(*txnWrapper, string) (string, error) { _, err := s.resolveACLLinks(tx, links, func(*txn, string) (string, error) {
err := fmt.Errorf("Should not be attempting to resolve an empty id") err := fmt.Errorf("Should not be attempting to resolve an empty id")
require.Fail(t, err.Error()) require.Fail(t, err.Error())
return "", err return "", err
@ -4131,7 +4131,7 @@ func TestStateStore_resolveACLLinks(t *testing.T) {
}, },
} }
numValid, err := s.resolveACLLinks(tx, links, func(_ *txnWrapper, linkID string) (string, error) { numValid, err := s.resolveACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
switch linkID { switch linkID {
case "e81887b4-836b-4053-a1fa-7e8305902be9": case "e81887b4-836b-4053-a1fa-7e8305902be9":
return "foo", nil return "foo", nil
@ -4161,7 +4161,7 @@ func TestStateStore_resolveACLLinks(t *testing.T) {
}, },
} }
numValid, err := s.resolveACLLinks(tx, links, func(_ *txnWrapper, linkID string) (string, error) { numValid, err := s.resolveACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
require.Equal(t, "b985e082-25d3-45a9-9dd8-fd1a41b83b0d", linkID) require.Equal(t, "b985e082-25d3-45a9-9dd8-fd1a41b83b0d", linkID)
return "", nil return "", nil
}) })
@ -4201,7 +4201,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
newLinks, cloned, err := s.fixupACLLinks(tx, links, func(_ *txnWrapper, linkID string) (string, error) { newLinks, cloned, err := s.fixupACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
switch linkID { switch linkID {
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd": case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
return "foo", nil return "foo", nil
@ -4228,7 +4228,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
newLinks, cloned, err := s.fixupACLLinks(tx, links, func(_ *txnWrapper, linkID string) (string, error) { newLinks, cloned, err := s.fixupACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
switch linkID { switch linkID {
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd": case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
return "foo", nil return "foo", nil
@ -4260,7 +4260,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
newLinks, cloned, err := s.fixupACLLinks(tx, links, func(_ *txnWrapper, linkID string) (string, error) { newLinks, cloned, err := s.fixupACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
switch linkID { switch linkID {
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd": case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
return "foo", nil return "foo", nil
@ -4287,7 +4287,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
_, _, err := s.fixupACLLinks(tx, links, func(*txnWrapper, string) (string, error) { _, _, err := s.fixupACLLinks(tx, links, func(*txn, string) (string, error) {
return "", fmt.Errorf("Resolver Error") return "", fmt.Errorf("Resolver Error")
}) })

View File

@ -113,7 +113,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (
return err == nil, err return err == nil, err
} }
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *txnWrapper, config *autopilot.Config) error { func (s *Store) autopilotSetConfigTxn(idx uint64, tx *txn, config *autopilot.Config) error {
// Check for an existing config // Check for an existing config
existing, err := tx.First("autopilot-config", "id") existing, err := tx.First("autopilot-config", "id")
if err != nil { if err != nil {

View File

@ -236,7 +236,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
return tx.Commit() return tx.Commit()
} }
func (s *Store) ensureCheckIfNodeMatches(tx *txnWrapper, idx uint64, node string, check *structs.HealthCheck) error { func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, node string, check *structs.HealthCheck) error {
if check.Node != node { if check.Node != node {
return fmt.Errorf("check node %q does not match node %q", return fmt.Errorf("check node %q does not match node %q",
check.Node, node) check.Node, node)
@ -250,7 +250,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *txnWrapper, idx uint64, node string
// ensureRegistrationTxn is used to make sure a node, service, and check // ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race // registration is performed within a single transaction to avoid race
// conditions on state updates. // conditions on state updates.
func (s *Store) ensureRegistrationTxn(tx *txnWrapper, idx uint64, req *structs.RegisterRequest) error { func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, req *structs.RegisterRequest) error {
if _, err := s.validateRegisterRequestTxn(tx, req); err != nil { if _, err := s.validateRegisterRequestTxn(tx, req); err != nil {
return err return err
} }
@ -328,7 +328,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
// ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name // ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed // If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *txnWrapper, node *structs.Node, allowClashWithoutID bool) error { func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWithoutID bool) error {
// Retrieve all of the nodes // Retrieve all of the nodes
enodes, err := tx.Get("nodes", "id") enodes, err := tx.Get("nodes", "id")
if err != nil { if err != nil {
@ -364,7 +364,7 @@ func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *txnWrapper, node *structs.Nod
// ensureNodeCASTxn updates a node only if the existing index matches the given index. // ensureNodeCASTxn updates a node only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error. // Returns a bool indicating if a write happened and any error.
func (s *Store) ensureNodeCASTxn(tx *txnWrapper, idx uint64, node *structs.Node) (bool, error) { func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
existing, err := getNodeTxn(tx, node.Node) existing, err := getNodeTxn(tx, node.Node)
if err != nil { if err != nil {
@ -394,7 +394,7 @@ func (s *Store) ensureNodeCASTxn(tx *txnWrapper, idx uint64, node *structs.Node)
// ensureNodeTxn is the inner function called to actually create a node // ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows // registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn. // passing in a memdb transaction so it may be part of a larger txn.
func (s *Store) ensureNodeTxn(tx *txnWrapper, idx uint64, node *structs.Node) error { func (s *Store) ensureNodeTxn(tx *txn, idx uint64, node *structs.Node) error {
// See if there's an existing node with this UUID, and make sure the // See if there's an existing node with this UUID, and make sure the
// name is the same. // name is the same.
var n *structs.Node var n *structs.Node
@ -492,7 +492,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
return idx, node, nil return idx, node, nil
} }
func getNodeTxn(tx *txnWrapper, nodeName string) (*structs.Node, error) { func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) {
node, err := tx.First("nodes", "id", nodeName) node, err := tx.First("nodes", "id", nodeName)
if err != nil { if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err) return nil, fmt.Errorf("node lookup failed: %s", err)
@ -503,7 +503,7 @@ func getNodeTxn(tx *txnWrapper, nodeName string) (*structs.Node, error) {
return nil, nil return nil, nil
} }
func getNodeIDTxn(tx *txnWrapper, id types.NodeID) (*structs.Node, error) { func getNodeIDTxn(tx *txn, id types.NodeID) (*structs.Node, error) {
strnode := string(id) strnode := string(id)
uuidValue, err := uuid.ParseUUID(strnode) uuidValue, err := uuid.ParseUUID(strnode)
if err != nil { if err != nil {
@ -603,7 +603,7 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error {
// deleteNodeCASTxn is used to try doing a node delete operation with a given // deleteNodeCASTxn is used to try doing a node delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked. // the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteNodeCASTxn(tx *txnWrapper, idx, cidx uint64, nodeName string) (bool, error) { func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bool, error) {
// Look up the node. // Look up the node.
node, err := getNodeTxn(tx, nodeName) node, err := getNodeTxn(tx, nodeName)
if err != nil { if err != nil {
@ -630,7 +630,7 @@ func (s *Store) deleteNodeCASTxn(tx *txnWrapper, idx, cidx uint64, nodeName stri
// deleteNodeTxn is the inner method used for removing a node from // deleteNodeTxn is the inner method used for removing a node from
// the store within a given transaction. // the store within a given transaction.
func (s *Store) deleteNodeTxn(tx *txnWrapper, idx uint64, nodeName string) error { func (s *Store) deleteNodeTxn(tx *txn, idx uint64, nodeName string) error {
// Look up the node. // Look up the node.
node, err := tx.First("nodes", "id", nodeName) node, err := tx.First("nodes", "id", nodeName)
if err != nil { if err != nil {
@ -737,7 +737,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
// ensureServiceCASTxn updates a service only if the existing index matches the given index. // ensureServiceCASTxn updates a service only if the existing index matches the given index.
// Returns an error if the write didn't happen and nil if write was successful. // Returns an error if the write didn't happen and nil if write was successful.
func (s *Store) ensureServiceCASTxn(tx *txnWrapper, idx uint64, node string, svc *structs.NodeService) error { func (s *Store) ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeService) error {
// Retrieve the existing service. // Retrieve the existing service.
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil { if err != nil {
@ -762,7 +762,7 @@ func (s *Store) ensureServiceCASTxn(tx *txnWrapper, idx uint64, node string, svc
// ensureServiceTxn is used to upsert a service registration within an // ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction. // existing memdb transaction.
func (s *Store) ensureServiceTxn(tx *txnWrapper, idx uint64, node string, svc *structs.NodeService) error { func (s *Store) ensureServiceTxn(tx *txn, idx uint64, node string, svc *structs.NodeService) error {
// Check for existing service // Check for existing service
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil { if err != nil {
@ -859,7 +859,7 @@ func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta)
return s.serviceListTxn(tx, ws, entMeta) return s.serviceListTxn(tx, ws, entMeta)
} }
func (s *Store) serviceListTxn(tx *txnWrapper, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { func (s *Store) serviceListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
idx := s.catalogServicesMaxIndex(tx, entMeta) idx := s.catalogServicesMaxIndex(tx, entMeta)
services, err := s.catalogServiceList(tx, entMeta, true) services, err := s.catalogServiceList(tx, entMeta, true)
@ -963,7 +963,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
// * return when the last instance of a service is removed // * return when the last instance of a service is removed
// * block until an instance for this service is available, or another // * block until an instance for this service is available, or another
// service is unregistered. // service is unregistered.
func (s *Store) maxIndexForService(tx *txnWrapper, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 { func (s *Store) maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 {
idx, _ := s.maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks, entMeta) idx, _ := s.maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks, entMeta)
return idx return idx
} }
@ -982,7 +982,7 @@ func (s *Store) maxIndexForService(tx *txnWrapper, serviceName string, serviceEx
// returned for the chan. This allows for blocking watchers to _only_ watch this // returned for the chan. This allows for blocking watchers to _only_ watch this
// one chan in the common case, falling back to watching all touched MemDB // one chan in the common case, falling back to watching all touched MemDB
// indexes in more complicated cases. // indexes in more complicated cases.
func (s *Store) maxIndexAndWatchChForService(tx *txnWrapper, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) { func (s *Store) maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) {
if !serviceExists { if !serviceExists {
res, err := s.catalogServiceLastExtinctionIndex(tx, entMeta) res, err := s.catalogServiceLastExtinctionIndex(tx, entMeta)
if missingIdx, ok := res.(*IndexEntry); ok && err == nil { if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
@ -999,7 +999,7 @@ func (s *Store) maxIndexAndWatchChForService(tx *txnWrapper, serviceName string,
} }
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes // Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *txnWrapper, func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *txn,
nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) { nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) {
var watchChans []<-chan struct{} var watchChans []<-chan struct{}
@ -1206,7 +1206,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
// parseServiceNodes iterates over a services query and fills in the node details, // parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice. // returning a ServiceNodes slice.
func (s *Store) parseServiceNodes(tx *txnWrapper, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { func (s *Store) parseServiceNodes(tx *txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
// We don't want to track an unlimited number of nodes, so we pull a // We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback. // top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id") allNodes, err := tx.Get("nodes", "id")
@ -1263,7 +1263,7 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
return idx, service, nil return idx, service, nil
} }
func (s *Store) getNodeServiceTxn(tx *txnWrapper, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { func (s *Store) getNodeServiceTxn(tx *txn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
// Query the service // Query the service
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil { if err != nil {
@ -1405,7 +1405,7 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string, entMeta *s
// deleteServiceCASTxn is used to try doing a service delete operation with a given // deleteServiceCASTxn is used to try doing a service delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given service, then the call is a noop, otherwise a normal delete is invoked. // the given service, then the call is a noop, otherwise a normal delete is invoked.
func (s *Store) deleteServiceCASTxn(tx *txnWrapper, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) { func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Look up the service. // Look up the service.
service, err := s.getNodeServiceTxn(tx, nodeName, serviceID, entMeta) service, err := s.getNodeServiceTxn(tx, nodeName, serviceID, entMeta)
if err != nil { if err != nil {
@ -1432,7 +1432,7 @@ func (s *Store) deleteServiceCASTxn(tx *txnWrapper, idx, cidx uint64, nodeName,
// deleteServiceTxn is the inner method called to remove a service // deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction. // registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *txnWrapper, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error {
// Look up the service. // Look up the service.
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil { if err != nil {
@ -1540,7 +1540,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
} }
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node // updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
func (s *Store) updateAllServiceIndexesOfNode(tx *txnWrapper, idx uint64, nodeID string) error { func (s *Store) updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error {
services, err := tx.Get("services", "node", nodeID) services, err := tx.Get("services", "node", nodeID)
if err != nil { if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err) return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
@ -1559,7 +1559,7 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *txnWrapper, idx uint64, nodeID
// ensureCheckCASTxn updates a check only if the existing index matches the given index. // ensureCheckCASTxn updates a check only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error. // Returns a bool indicating if a write happened and any error.
func (s *Store) ensureCheckCASTxn(tx *txnWrapper, idx uint64, hc *structs.HealthCheck) (bool, error) { func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
_, existing, err := s.getNodeCheckTxn(tx, hc.Node, hc.CheckID, &hc.EnterpriseMeta) _, existing, err := s.getNodeCheckTxn(tx, hc.Node, hc.CheckID, &hc.EnterpriseMeta)
if err != nil { if err != nil {
@ -1589,7 +1589,7 @@ func (s *Store) ensureCheckCASTxn(tx *txnWrapper, idx uint64, hc *structs.Health
// ensureCheckTxn is used as the inner method to handle inserting // ensureCheckTxn is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting // a health check into the state store. It ensures safety against inserting
// checks with no matching node or service. // checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx *txnWrapper, idx uint64, hc *structs.HealthCheck) error { func (s *Store) ensureCheckTxn(tx *txn, idx uint64, hc *structs.HealthCheck) error {
// Check if we have an existing health check // Check if we have an existing health check
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil { if err != nil {
@ -1692,7 +1692,7 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID, entMeta *struc
// nodeCheckTxn is used as the inner method to handle reading a health check // nodeCheckTxn is used as the inner method to handle reading a health check
// from the state store. // from the state store.
func (s *Store) getNodeCheckTxn(tx *txnWrapper, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) { func (s *Store) getNodeCheckTxn(tx *txn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) {
// Get the table index. // Get the table index.
idx := s.catalogChecksMaxIndex(tx, entMeta) idx := s.catalogChecksMaxIndex(tx, entMeta)
@ -1808,7 +1808,7 @@ func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters
return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters) return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters)
} }
func (s *Store) checksInStateTxn(tx *txnWrapper, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) { func (s *Store) checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) {
// Get the table index. // Get the table index.
idx := s.catalogChecksMaxIndex(tx, entMeta) idx := s.catalogChecksMaxIndex(tx, entMeta)
@ -1830,7 +1830,7 @@ func (s *Store) checksInStateTxn(tx *txnWrapper, ws memdb.WatchSet, state string
// parseChecksByNodeMeta is a helper function used to deduplicate some // parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields. // repetitive code for returning health checks filtered by node metadata fields.
func (s *Store) parseChecksByNodeMeta(tx *txnWrapper, ws memdb.WatchSet, func (s *Store) parseChecksByNodeMeta(tx *txn, ws memdb.WatchSet,
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) { idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
// We don't want to track an unlimited number of nodes, so we pull a // We don't want to track an unlimited number of nodes, so we pull a
@ -1879,7 +1879,7 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID, entM
// deleteCheckCASTxn is used to try doing a check delete operation with a given // deleteCheckCASTxn is used to try doing a check delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked. // the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteCheckCASTxn(tx *txnWrapper, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) { func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) {
// Try to retrieve the existing health check. // Try to retrieve the existing health check.
_, hc, err := s.getNodeCheckTxn(tx, node, checkID, entMeta) _, hc, err := s.getNodeCheckTxn(tx, node, checkID, entMeta)
if err != nil { if err != nil {
@ -1906,7 +1906,7 @@ func (s *Store) deleteCheckCASTxn(tx *txnWrapper, idx, cidx uint64, node string,
// deleteCheckTxn is the inner method used to call a health // deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction. // check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx *txnWrapper, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
// Try to retrieve the existing health check. // Try to retrieve the existing health check.
_, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID)) _, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID))
if err != nil { if err != nil {
@ -2023,7 +2023,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
return s.checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta) return s.checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta)
} }
func (s *Store) checkServiceNodesTxn(tx *txnWrapper, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { func (s *Store) checkServiceNodesTxn(tx *txn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Function for lookup // Function for lookup
index := "service" index := "service"
if connect { if connect {
@ -2213,7 +2213,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
// and query for an associated node and a set of checks. This is the inner // and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query. // method used to return a rich set of results from a more simple query.
func (s *Store) parseCheckServiceNodes( func (s *Store) parseCheckServiceNodes(
tx *txnWrapper, ws memdb.WatchSet, idx uint64, tx *txn, ws memdb.WatchSet, idx uint64,
serviceName string, services structs.ServiceNodes, serviceName string, services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) { err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil { if err != nil {
@ -2338,7 +2338,7 @@ func (s *Store) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind
} }
} }
func (s *Store) serviceDumpAllTxn(tx *txnWrapper, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { func (s *Store) serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Get the table index // Get the table index
idx := s.catalogMaxIndexWatch(tx, ws, entMeta, true) idx := s.catalogMaxIndexWatch(tx, ws, entMeta, true)
@ -2356,7 +2356,7 @@ func (s *Store) serviceDumpAllTxn(tx *txnWrapper, ws memdb.WatchSet, entMeta *st
return s.parseCheckServiceNodes(tx, nil, idx, "", results, err) return s.parseCheckServiceNodes(tx, nil, idx, "", results, err)
} }
func (s *Store) serviceDumpKindTxn(tx *txnWrapper, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { func (s *Store) serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks) // unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks)
// updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual // updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual
// entries // entries
@ -2380,7 +2380,7 @@ func (s *Store) serviceDumpKindTxn(tx *txnWrapper, ws memdb.WatchSet, kind struc
// parseNodes takes an iterator over a set of nodes and returns a struct // parseNodes takes an iterator over a set of nodes and returns a struct
// containing the nodes along with all of their associated services // containing the nodes along with all of their associated services
// and/or health checks. // and/or health checks.
func (s *Store) parseNodes(tx *txnWrapper, ws memdb.WatchSet, idx uint64, func (s *Store) parseNodes(tx *txn, ws memdb.WatchSet, idx uint64,
iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) { iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) {
// We don't want to track an unlimited number of services, so we pull a // We don't want to track an unlimited number of services, so we pull a
@ -2440,7 +2440,7 @@ func (s *Store) parseNodes(tx *txnWrapper, ws memdb.WatchSet, idx uint64,
} }
// checkSessionsTxn returns the IDs of all sessions associated with a health check // checkSessionsTxn returns the IDs of all sessions associated with a health check
func checkSessionsTxn(tx *txnWrapper, hc *structs.HealthCheck) ([]*sessionCheck, error) { func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error) {
mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed session checks lookup: %s", err) return nil, fmt.Errorf("failed session checks lookup: %s", err)
@ -2454,7 +2454,7 @@ func checkSessionsTxn(tx *txnWrapper, hc *structs.HealthCheck) ([]*sessionCheck,
} }
// updateGatewayServices associates services with gateways as specified in a gateway config entry // updateGatewayServices associates services with gateways as specified in a gateway config entry
func (s *Store) updateGatewayServices(tx *txnWrapper, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error { func (s *Store) updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
var ( var (
noChange bool noChange bool
gatewayServices structs.GatewayServices gatewayServices structs.GatewayServices
@ -2511,7 +2511,7 @@ func (s *Store) updateGatewayServices(tx *txnWrapper, idx uint64, conf structs.C
// insertion into the memdb table, specific to ingress gateways. The boolean // insertion into the memdb table, specific to ingress gateways. The boolean
// returned indicates that there are no changes necessary to the memdb table. // returned indicates that there are no changes necessary to the memdb table.
func (s *Store) ingressConfigGatewayServices( func (s *Store) ingressConfigGatewayServices(
tx *txnWrapper, tx *txn,
gateway structs.ServiceName, gateway structs.ServiceName,
conf structs.ConfigEntry, conf structs.ConfigEntry,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
@ -2556,7 +2556,7 @@ func (s *Store) ingressConfigGatewayServices(
// boolean returned indicates that there are no changes necessary to the memdb // boolean returned indicates that there are no changes necessary to the memdb
// table. // table.
func (s *Store) terminatingConfigGatewayServices( func (s *Store) terminatingConfigGatewayServices(
tx *txnWrapper, tx *txn,
gateway structs.ServiceName, gateway structs.ServiceName,
conf structs.ConfigEntry, conf structs.ConfigEntry,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
@ -2596,7 +2596,7 @@ func (s *Store) terminatingConfigGatewayServices(
} }
// updateGatewayNamespace is used to target all services within a namespace // updateGatewayNamespace is used to target all services within a namespace
func (s *Store) updateGatewayNamespace(tx *txnWrapper, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error { func (s *Store) updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error {
services, err := s.catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta) services, err := s.catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta)
if err != nil { if err != nil {
return fmt.Errorf("failed querying services: %s", err) return fmt.Errorf("failed querying services: %s", err)
@ -2643,7 +2643,7 @@ func (s *Store) updateGatewayNamespace(tx *txnWrapper, idx uint64, service *stru
// updateGatewayService associates services with gateways after an eligible event // updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway // ie. Registering a service in a namespace targeted by a gateway
func (s *Store) updateGatewayService(tx *txnWrapper, idx uint64, mapping *structs.GatewayService) error { func (s *Store) updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService) error {
// Check if mapping already exists in table if it's already in the table // Check if mapping already exists in table if it's already in the table
// Avoid insert if nothing changed // Avoid insert if nothing changed
existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port) existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port)
@ -2674,7 +2674,7 @@ func (s *Store) updateGatewayService(tx *txnWrapper, idx uint64, mapping *struct
// checkWildcardForGatewaysAndUpdate checks whether a service matches a // checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the // wildcard definition in gateway config entries and if so adds it the the
// gateway-services table. // gateway-services table.
func (s *Store) checkGatewayWildcardsAndUpdate(tx *txnWrapper, idx uint64, svc *structs.NodeService) error { func (s *Store) checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeService) error {
// Do not associate non-typical services with gateways or consul services // Do not associate non-typical services with gateways or consul services
if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" { if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" {
return nil return nil
@ -2703,18 +2703,18 @@ func (s *Store) checkGatewayWildcardsAndUpdate(tx *txnWrapper, idx uint64, svc *
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up // serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service. // all the gateways mapped to this service.
func (s *Store) serviceGateways(tx *txnWrapper, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta)) return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta))
} }
func (s *Store) gatewayServices(tx *txnWrapper, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) gatewayServices(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)) return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta))
} }
// TODO(ingress): How to handle index rolling back when a config entry is // TODO(ingress): How to handle index rolling back when a config entry is
// deleted that references a service? // deleted that references a service?
// We might need something like the service_last_extinction index? // We might need something like the service_last_extinction index?
func (s *Store) serviceGatewayNodes(tx *txnWrapper, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { func (s *Store) serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
// Look up gateway name associated with the service // Look up gateway name associated with the service
gws, err := s.serviceGateways(tx, service, entMeta) gws, err := s.serviceGateways(tx, service, entMeta)
if err != nil { if err != nil {
@ -2766,7 +2766,7 @@ func (s *Store) serviceGatewayNodes(tx *txnWrapper, ws memdb.WatchSet, service s
// checkProtocolMatch filters out any GatewayService entries added from a wildcard with a protocol // checkProtocolMatch filters out any GatewayService entries added from a wildcard with a protocol
// that doesn't match the one configured in their discovery chain. // that doesn't match the one configured in their discovery chain.
func (s *Store) checkProtocolMatch( func (s *Store) checkProtocolMatch(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
svc *structs.GatewayService, svc *structs.GatewayService,
) (uint64, bool, error) { ) (uint64, bool, error) {

View File

@ -4,6 +4,7 @@ package state
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
) )
@ -167,7 +168,7 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s
} }
} }
func (s *Store) catalogUpdateServicesIndexes(tx *txnWrapper, idx uint64, _ *structs.EnterpriseMeta) error { func (s *Store) catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error {
// overall services index // overall services index
if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil { if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -176,7 +177,7 @@ func (s *Store) catalogUpdateServicesIndexes(tx *txnWrapper, idx uint64, _ *stru
return nil return nil
} }
func (s *Store) catalogUpdateServiceKindIndexes(tx *txnWrapper, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error { func (s *Store) catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error {
// service-kind index // service-kind index
if err := indexUpdateMaxTxn(tx, idx, serviceKindIndexName(kind, nil)); err != nil { if err := indexUpdateMaxTxn(tx, idx, serviceKindIndexName(kind, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -185,7 +186,7 @@ func (s *Store) catalogUpdateServiceKindIndexes(tx *txnWrapper, kind structs.Ser
return nil return nil
} }
func (s *Store) catalogUpdateServiceIndexes(tx *txnWrapper, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error { func (s *Store) catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error {
// per-service index // per-service index
if err := indexUpdateMaxTxn(tx, idx, serviceIndexName(serviceName, nil)); err != nil { if err := indexUpdateMaxTxn(tx, idx, serviceIndexName(serviceName, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -194,14 +195,14 @@ func (s *Store) catalogUpdateServiceIndexes(tx *txnWrapper, serviceName string,
return nil return nil
} }
func (s *Store) catalogUpdateServiceExtinctionIndex(tx *txnWrapper, idx uint64, _ *structs.EnterpriseMeta) error { func (s *Store) catalogUpdateServiceExtinctionIndex(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil { if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %s", err) return fmt.Errorf("failed updating missing service extinction index: %s", err)
} }
return nil return nil
} }
func (s *Store) catalogInsertService(tx *txnWrapper, svc *structs.ServiceNode) error { func (s *Store) catalogInsertService(tx *txn, svc *structs.ServiceNode) error {
// Insert the service and update the index // Insert the service and update the index
if err := tx.Insert("services", svc); err != nil { if err := tx.Insert("services", svc); err != nil {
return fmt.Errorf("failed inserting service: %s", err) return fmt.Errorf("failed inserting service: %s", err)
@ -222,53 +223,53 @@ func (s *Store) catalogInsertService(tx *txnWrapper, svc *structs.ServiceNode) e
return nil return nil
} }
func (s *Store) catalogServicesMaxIndex(tx *txnWrapper, _ *structs.EnterpriseMeta) uint64 { func (s *Store) catalogServicesMaxIndex(tx *txn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "services") return maxIndexTxn(tx, "services")
} }
func (s *Store) catalogServiceMaxIndex(tx *txnWrapper, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func (s *Store) catalogServiceMaxIndex(tx *txn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil)) return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil))
} }
func (s *Store) catalogServiceKindMaxIndex(tx *txnWrapper, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 { func (s *Store) catalogServiceKindMaxIndex(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind, nil)) return maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind, nil))
} }
func (s *Store) catalogServiceList(tx *txnWrapper, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) { func (s *Store) catalogServiceList(tx *txn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
return tx.Get("services", "id") return tx.Get("services", "id")
} }
func (s *Store) catalogServiceListByKind(tx *txnWrapper, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogServiceListByKind(tx *txn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("services", "kind", string(kind)) return tx.Get("services", "kind", string(kind))
} }
func (s *Store) catalogServiceListByNode(tx *txnWrapper, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) { func (s *Store) catalogServiceListByNode(tx *txn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
return tx.Get("services", "node", node) return tx.Get("services", "node", node)
} }
func (s *Store) catalogServiceNodeList(tx *txnWrapper, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogServiceNodeList(tx *txn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("services", index, name) return tx.Get("services", index, name)
} }
func (s *Store) catalogServiceLastExtinctionIndex(tx *txnWrapper, _ *structs.EnterpriseMeta) (interface{}, error) { func (s *Store) catalogServiceLastExtinctionIndex(tx *txn, _ *structs.EnterpriseMeta) (interface{}, error) {
return tx.First("index", "id", serviceLastExtinctionIndexName) return tx.First("index", "id", serviceLastExtinctionIndexName)
} }
func (s *Store) catalogMaxIndex(tx *txnWrapper, _ *structs.EnterpriseMeta, checks bool) uint64 { func (s *Store) catalogMaxIndex(tx *txn, _ *structs.EnterpriseMeta, checks bool) uint64 {
if checks { if checks {
return maxIndexTxn(tx, "nodes", "services", "checks") return maxIndexTxn(tx, "nodes", "services", "checks")
} }
return maxIndexTxn(tx, "nodes", "services") return maxIndexTxn(tx, "nodes", "services")
} }
func (s *Store) catalogMaxIndexWatch(tx *txnWrapper, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 { func (s *Store) catalogMaxIndexWatch(tx *txn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 {
if checks { if checks {
return maxIndexWatchTxn(tx, ws, "nodes", "services", "checks") return maxIndexWatchTxn(tx, ws, "nodes", "services", "checks")
} }
return maxIndexWatchTxn(tx, ws, "nodes", "services") return maxIndexWatchTxn(tx, ws, "nodes", "services")
} }
func (s *Store) catalogUpdateCheckIndexes(tx *txnWrapper, idx uint64, _ *structs.EnterpriseMeta) error { func (s *Store) catalogUpdateCheckIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error {
// update the universal index entry // update the universal index entry
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -276,36 +277,36 @@ func (s *Store) catalogUpdateCheckIndexes(tx *txnWrapper, idx uint64, _ *structs
return nil return nil
} }
func (s *Store) catalogChecksMaxIndex(tx *txnWrapper, _ *structs.EnterpriseMeta) uint64 { func (s *Store) catalogChecksMaxIndex(tx *txn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "checks") return maxIndexTxn(tx, "checks")
} }
func (s *Store) catalogListChecksByNode(tx *txnWrapper, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogListChecksByNode(tx *txn, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "node", node) return tx.Get("checks", "node", node)
} }
func (s *Store) catalogListChecksByService(tx *txnWrapper, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogListChecksByService(tx *txn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "service", service) return tx.Get("checks", "service", service)
} }
func (s *Store) catalogListChecksInState(tx *txnWrapper, state string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogListChecksInState(tx *txn, state string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// simpler than normal due to the use of the CompoundMultiIndex // simpler than normal due to the use of the CompoundMultiIndex
return tx.Get("checks", "status", state) return tx.Get("checks", "status", state)
} }
func (s *Store) catalogListChecks(tx *txnWrapper, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogListChecks(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "id") return tx.Get("checks", "id")
} }
func (s *Store) catalogListNodeChecks(tx *txnWrapper, node string) (memdb.ResultIterator, error) { func (s *Store) catalogListNodeChecks(tx *txn, node string) (memdb.ResultIterator, error) {
return tx.Get("checks", "node_service_check", node, false) return tx.Get("checks", "node_service_check", node, false)
} }
func (s *Store) catalogListServiceChecks(tx *txnWrapper, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogListServiceChecks(tx *txn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "node_service", node, service) return tx.Get("checks", "node_service", node, service)
} }
func (s *Store) catalogInsertCheck(tx *txnWrapper, chk *structs.HealthCheck, idx uint64) error { func (s *Store) catalogInsertCheck(tx *txn, chk *structs.HealthCheck, idx uint64) error {
// Insert the check // Insert the check
if err := tx.Insert("checks", chk); err != nil { if err := tx.Insert("checks", chk); err != nil {
return fmt.Errorf("failed inserting check: %s", err) return fmt.Errorf("failed inserting check: %s", err)
@ -318,11 +319,11 @@ func (s *Store) catalogInsertCheck(tx *txnWrapper, chk *structs.HealthCheck, idx
return nil return nil
} }
func (s *Store) catalogChecksForNodeService(tx *txnWrapper, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func (s *Store) catalogChecksForNodeService(tx *txn, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "node_service", node, service) return tx.Get("checks", "node_service", node, service)
} }
func (s *Store) validateRegisterRequestTxn(tx *txnWrapper, args *structs.RegisterRequest) (*structs.EnterpriseMeta, error) { func (s *Store) validateRegisterRequestTxn(tx *txn, args *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
return nil, nil return nil, nil
} }

View File

@ -106,7 +106,7 @@ func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *struc
return s.configEntryTxn(tx, ws, kind, name, entMeta) return s.configEntryTxn(tx, ws, kind, name, entMeta)
} }
func (s *Store) configEntryTxn(tx *txnWrapper, ws memdb.WatchSet, kind, name string, entMeta *structs.EnterpriseMeta) (uint64, structs.ConfigEntry, error) { func (s *Store) configEntryTxn(tx *txn, ws memdb.WatchSet, kind, name string, entMeta *structs.EnterpriseMeta) (uint64, structs.ConfigEntry, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
@ -141,7 +141,7 @@ func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string, entMeta *str
return s.configEntriesByKindTxn(tx, ws, kind, entMeta) return s.configEntriesByKindTxn(tx, ws, kind, entMeta)
} }
func (s *Store) configEntriesByKindTxn(tx *txnWrapper, ws memdb.WatchSet, kind string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) { func (s *Store) configEntriesByKindTxn(tx *txn, ws memdb.WatchSet, kind string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
@ -178,7 +178,7 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry, entMeta
} }
// ensureConfigEntryTxn upserts a config entry inside of a transaction. // ensureConfigEntryTxn upserts a config entry inside of a transaction.
func (s *Store) ensureConfigEntryTxn(tx *txnWrapper, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error { func (s *Store) ensureConfigEntryTxn(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
// Check for existing configuration. // Check for existing configuration.
existing, err := s.firstConfigEntryWithTxn(tx, conf.GetKind(), conf.GetName(), entMeta) existing, err := s.firstConfigEntryWithTxn(tx, conf.GetKind(), conf.GetName(), entMeta)
if err != nil { if err != nil {
@ -296,7 +296,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
return tx.Commit() return tx.Commit()
} }
func (s *Store) insertConfigEntryWithTxn(tx *txnWrapper, idx uint64, conf structs.ConfigEntry) error { func (s *Store) insertConfigEntryWithTxn(tx *txn, idx uint64, conf structs.ConfigEntry) error {
if conf == nil { if conf == nil {
return fmt.Errorf("cannot insert nil config entry") return fmt.Errorf("cannot insert nil config entry")
} }
@ -328,7 +328,7 @@ func (s *Store) insertConfigEntryWithTxn(tx *txnWrapper, idx uint64, conf struct
// May return *ConfigEntryGraphValidationError if there is a concern to surface // May return *ConfigEntryGraphValidationError if there is a concern to surface
// to the caller that they can correct. // to the caller that they can correct.
func (s *Store) validateProposedConfigEntryInGraph( func (s *Store) validateProposedConfigEntryInGraph(
tx *txnWrapper, tx *txn,
idx uint64, idx uint64,
kind, name string, kind, name string,
next structs.ConfigEntry, next structs.ConfigEntry,
@ -369,7 +369,7 @@ func (s *Store) validateProposedConfigEntryInGraph(
} }
func (s *Store) checkGatewayClash( func (s *Store) checkGatewayClash(
tx *txnWrapper, tx *txn,
name, selfKind, otherKind string, name, selfKind, otherKind string,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
) error { ) error {
@ -391,7 +391,7 @@ var serviceGraphKinds = []string{
} }
func (s *Store) validateProposedConfigEntryInServiceGraph( func (s *Store) validateProposedConfigEntryInServiceGraph(
tx *txnWrapper, tx *txn,
idx uint64, idx uint64,
kind, name string, kind, name string,
next structs.ConfigEntry, next structs.ConfigEntry,
@ -447,7 +447,7 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
} }
func (s *Store) testCompileDiscoveryChain( func (s *Store) testCompileDiscoveryChain(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
chainName string, chainName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -510,7 +510,7 @@ func (s *Store) readDiscoveryChainConfigEntries(
} }
func (s *Store) readDiscoveryChainConfigEntriesTxn( func (s *Store) readDiscoveryChainConfigEntriesTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
serviceName string, serviceName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -699,7 +699,7 @@ func anyKey(m map[structs.ServiceID]struct{}) (structs.ServiceID, bool) {
// //
// If an override is returned the index returned will be 0. // If an override is returned the index returned will be 0.
func (s *Store) getProxyConfigEntryTxn( func (s *Store) getProxyConfigEntryTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
name string, name string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -724,7 +724,7 @@ func (s *Store) getProxyConfigEntryTxn(
// //
// If an override is returned the index returned will be 0. // If an override is returned the index returned will be 0.
func (s *Store) getServiceConfigEntryTxn( func (s *Store) getServiceConfigEntryTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
serviceName string, serviceName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -749,7 +749,7 @@ func (s *Store) getServiceConfigEntryTxn(
// //
// If an override is returned the index returned will be 0. // If an override is returned the index returned will be 0.
func (s *Store) getRouterConfigEntryTxn( func (s *Store) getRouterConfigEntryTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
serviceName string, serviceName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -774,7 +774,7 @@ func (s *Store) getRouterConfigEntryTxn(
// //
// If an override is returned the index returned will be 0. // If an override is returned the index returned will be 0.
func (s *Store) getSplitterConfigEntryTxn( func (s *Store) getSplitterConfigEntryTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
serviceName string, serviceName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -799,7 +799,7 @@ func (s *Store) getSplitterConfigEntryTxn(
// //
// If an override is returned the index returned will be 0. // If an override is returned the index returned will be 0.
func (s *Store) getResolverConfigEntryTxn( func (s *Store) getResolverConfigEntryTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
serviceName string, serviceName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
@ -820,7 +820,7 @@ func (s *Store) getResolverConfigEntryTxn(
} }
func (s *Store) configEntryWithOverridesTxn( func (s *Store) configEntryWithOverridesTxn(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
kind string, kind string,
name string, name string,
@ -840,7 +840,7 @@ func (s *Store) configEntryWithOverridesTxn(
} }
func (s *Store) validateProposedIngressProtocolsInServiceGraph( func (s *Store) validateProposedIngressProtocolsInServiceGraph(
tx *txnWrapper, tx *txn,
next structs.ConfigEntry, next structs.ConfigEntry,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
) error { ) error {
@ -884,7 +884,7 @@ func (s *Store) validateProposedIngressProtocolsInServiceGraph(
// protocolForService returns the service graph protocol associated to the // protocolForService returns the service graph protocol associated to the
// provided service, checking all relevant config entries. // provided service, checking all relevant config entries.
func (s *Store) protocolForService( func (s *Store) protocolForService(
tx *txnWrapper, tx *txn,
ws memdb.WatchSet, ws memdb.WatchSet,
svc structs.ServiceName, svc structs.ServiceName,
) (uint64, string, error) { ) (uint64, string, error) {

View File

@ -49,25 +49,25 @@ func configTableSchema() *memdb.TableSchema {
} }
} }
func (s *Store) firstConfigEntryWithTxn(tx *txnWrapper, func (s *Store) firstConfigEntryWithTxn(tx *txn,
kind, name string, entMeta *structs.EnterpriseMeta) (interface{}, error) { kind, name string, entMeta *structs.EnterpriseMeta) (interface{}, error) {
return tx.First(configTableName, "id", kind, name) return tx.First(configTableName, "id", kind, name)
} }
func (s *Store) firstWatchConfigEntryWithTxn(tx *txnWrapper, func (s *Store) firstWatchConfigEntryWithTxn(tx *txn,
kind, name string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { kind, name string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(configTableName, "id", kind, name) return tx.FirstWatch(configTableName, "id", kind, name)
} }
func (s *Store) validateConfigEntryEnterprise(tx *txnWrapper, conf structs.ConfigEntry) error { func (s *Store) validateConfigEntryEnterprise(tx *txn, conf structs.ConfigEntry) error {
return nil return nil
} }
func getAllConfigEntriesWithTxn(tx *txnWrapper, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func getAllConfigEntriesWithTxn(tx *txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(configTableName, "id") return tx.Get(configTableName, "id")
} }
func getConfigEntryKindsWithTxn(tx *txnWrapper, func getConfigEntryKindsWithTxn(tx *txn,
kind string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { kind string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(configTableName, "kind", kind) return tx.Get(configTableName, "kind", kind)
} }

View File

@ -116,7 +116,7 @@ func (s *Store) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, e
return s.caConfigTxn(tx, ws) return s.caConfigTxn(tx, ws)
} }
func (s *Store) caConfigTxn(tx *txnWrapper, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { func (s *Store) caConfigTxn(tx *txn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) {
// Get the CA config // Get the CA config
ch, c, err := tx.FirstWatch(caConfigTableName, "id") ch, c, err := tx.FirstWatch(caConfigTableName, "id")
if err != nil { if err != nil {
@ -174,7 +174,7 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur
return err == nil, err return err == nil, err
} }
func (s *Store) caSetConfigTxn(idx uint64, tx *txnWrapper, config *structs.CAConfiguration) error { func (s *Store) caSetConfigTxn(idx uint64, tx *txn, config *structs.CAConfiguration) error {
// Check for an existing config // Check for an existing config
prev, err := tx.First(caConfigTableName, "id") prev, err := tx.First(caConfigTableName, "id")
if err != nil { if err != nil {
@ -236,7 +236,7 @@ func (s *Store) CARoots(ws memdb.WatchSet) (uint64, structs.CARoots, error) {
return s.caRootsTxn(tx, ws) return s.caRootsTxn(tx, ws)
} }
func (s *Store) caRootsTxn(tx *txnWrapper, ws memdb.WatchSet) (uint64, structs.CARoots, error) { func (s *Store) caRootsTxn(tx *txn, ws memdb.WatchSet) (uint64, structs.CARoots, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, caRootTableName) idx := maxIndexTxn(tx, caRootTableName)

View File

@ -84,7 +84,7 @@ func (s *Store) FederationStateSet(idx uint64, config *structs.FederationState)
} }
// federationStateSetTxn upserts a federation state inside of a transaction. // federationStateSetTxn upserts a federation state inside of a transaction.
func (s *Store) federationStateSetTxn(tx *txnWrapper, idx uint64, config *structs.FederationState) error { func (s *Store) federationStateSetTxn(tx *txn, idx uint64, config *structs.FederationState) error {
if config.Datacenter == "" { if config.Datacenter == "" {
return fmt.Errorf("missing datacenter on federation state") return fmt.Errorf("missing datacenter on federation state")
} }
@ -134,7 +134,7 @@ func (s *Store) FederationStateGet(ws memdb.WatchSet, datacenter string) (uint64
return s.federationStateGetTxn(tx, ws, datacenter) return s.federationStateGetTxn(tx, ws, datacenter)
} }
func (s *Store) federationStateGetTxn(tx *txnWrapper, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) { func (s *Store) federationStateGetTxn(tx *txn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, federationStateTableName) idx := maxIndexTxn(tx, federationStateTableName)
@ -164,7 +164,7 @@ func (s *Store) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.Feder
return s.federationStateListTxn(tx, ws) return s.federationStateListTxn(tx, ws)
} }
func (s *Store) federationStateListTxn(tx *txnWrapper, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) { func (s *Store) federationStateListTxn(tx *txn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, federationStateTableName) idx := maxIndexTxn(tx, federationStateTableName)
@ -205,7 +205,7 @@ func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) err
return tx.Commit() return tx.Commit()
} }
func (s *Store) federationStateDeleteTxn(tx *txnWrapper, idx uint64, datacenter string) error { func (s *Store) federationStateDeleteTxn(tx *txn, idx uint64, datacenter string) error {
// Try to retrieve the existing federation state. // Try to retrieve the existing federation state.
existing, err := tx.First(federationStateTableName, "id", datacenter) existing, err := tx.First(federationStateTableName, "id", datacenter)
if err != nil { if err != nil {

View File

@ -28,7 +28,7 @@ func NewGraveyard(gc *TombstoneGC) *Graveyard {
} }
// InsertTxn adds a new tombstone. // InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *txnWrapper, key string, idx uint64, entMeta *structs.EnterpriseMeta) error { func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error {
stone := &Tombstone{ stone := &Tombstone{
Key: key, Key: key,
Index: idx, Index: idx,
@ -51,7 +51,7 @@ func (g *Graveyard) InsertTxn(tx *txnWrapper, key string, idx uint64, entMeta *s
// GetMaxIndexTxn returns the highest index tombstone whose key matches the // GetMaxIndexTxn returns the highest index tombstone whose key matches the
// given context, using a prefix match. // given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx *txnWrapper, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) { func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) {
stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta) stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta)
if err != nil { if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err) return 0, fmt.Errorf("failed querying tombstones: %s", err)
@ -68,7 +68,7 @@ func (g *Graveyard) GetMaxIndexTxn(tx *txnWrapper, prefix string, entMeta *struc
} }
// DumpTxn returns all the tombstones. // DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *txnWrapper) (memdb.ResultIterator, error) { func (g *Graveyard) DumpTxn(tx *txn) (memdb.ResultIterator, error) {
iter, err := tx.Get("tombstones", "id") iter, err := tx.Get("tombstones", "id")
if err != nil { if err != nil {
return nil, err return nil, err
@ -79,7 +79,7 @@ func (g *Graveyard) DumpTxn(tx *txnWrapper) (memdb.ResultIterator, error) {
// RestoreTxn is used when restoring from a snapshot. For general inserts, use // RestoreTxn is used when restoring from a snapshot. For general inserts, use
// InsertTxn. // InsertTxn.
func (g *Graveyard) RestoreTxn(tx *txnWrapper, stone *Tombstone) error { func (g *Graveyard) RestoreTxn(tx *txn, stone *Tombstone) error {
if err := g.insertTombstoneWithTxn(tx, "tombstones", stone, true); err != nil { if err := g.insertTombstoneWithTxn(tx, "tombstones", stone, true); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err) return fmt.Errorf("failed inserting tombstone: %s", err)
} }
@ -89,7 +89,7 @@ func (g *Graveyard) RestoreTxn(tx *txnWrapper, stone *Tombstone) error {
// ReapTxn cleans out all tombstones whose index values are less than or equal // ReapTxn cleans out all tombstones whose index values are less than or equal
// to the given idx. This prevents unbounded storage growth of the tombstones. // to the given idx. This prevents unbounded storage growth of the tombstones.
func (g *Graveyard) ReapTxn(tx *txnWrapper, idx uint64) error { func (g *Graveyard) ReapTxn(tx *txn, idx uint64) error {
// This does a full table scan since we currently can't index on a // This does a full table scan since we currently can't index on a
// numeric value. Since this is all in-memory and done infrequently // numeric value. Since this is all in-memory and done infrequently
// this pretty reasonable. // this pretty reasonable.

View File

@ -6,7 +6,7 @@ import (
"fmt" "fmt"
) )
func (g *Graveyard) insertTombstoneWithTxn(tx *txnWrapper, func (g *Graveyard) insertTombstoneWithTxn(tx *txn,
table string, stone *Tombstone, updateMax bool) error { table string, stone *Tombstone, updateMax bool) error {
if err := tx.Insert("tombstones", stone); err != nil { if err := tx.Insert("tombstones", stone); err != nil {

View File

@ -169,7 +169,7 @@ func (s *Store) IntentionSet(idx uint64, ixn *structs.Intention) error {
// intentionSetTxn is the inner method used to insert an intention with // intentionSetTxn is the inner method used to insert an intention with
// the proper indexes into the state store. // the proper indexes into the state store.
func (s *Store) intentionSetTxn(tx *txnWrapper, idx uint64, ixn *structs.Intention) error { func (s *Store) intentionSetTxn(tx *txn, idx uint64, ixn *structs.Intention) error {
// ID is required // ID is required
if ixn.ID == "" { if ixn.ID == "" {
return ErrMissingIntentionID return ErrMissingIntentionID
@ -264,7 +264,7 @@ func (s *Store) IntentionDelete(idx uint64, id string) error {
// intentionDeleteTxn is the inner method used to delete a intention // intentionDeleteTxn is the inner method used to delete a intention
// with the proper indexes into the state store. // with the proper indexes into the state store.
func (s *Store) intentionDeleteTxn(tx *txnWrapper, idx uint64, queryID string) error { func (s *Store) intentionDeleteTxn(tx *txn, idx uint64, queryID string) error {
// Pull the query. // Pull the query.
wrapped, err := tx.First(intentionsTableName, "id", queryID) wrapped, err := tx.First(intentionsTableName, "id", queryID)
if err != nil { if err != nil {

View File

@ -117,7 +117,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
// If updateSession is true, then the incoming entry will set the new // If updateSession is true, then the incoming entry will set the new
// session (should be validated before calling this). Otherwise, we will keep // session (should be validated before calling this). Otherwise, we will keep
// whatever the existing session is. // whatever the existing session is.
func (s *Store) kvsSetTxn(tx *txnWrapper, idx uint64, entry *structs.DirEntry, updateSession bool) error { func (s *Store) kvsSetTxn(tx *txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair // Retrieve an existing KV pair
existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta) existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil { if err != nil {
@ -170,7 +170,7 @@ func (s *Store) KVSGet(ws memdb.WatchSet, key string, entMeta *structs.Enterpris
// kvsGetTxn is the inner method that gets a KVS entry inside an existing // kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction. // transaction.
func (s *Store) kvsGetTxn(tx *txnWrapper, func (s *Store) kvsGetTxn(tx *txn,
ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) { ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) {
// Get the table index. // Get the table index.
@ -203,7 +203,7 @@ func (s *Store) KVSList(ws memdb.WatchSet,
// kvsListTxn is the inner method that gets a list of KVS entries matching a // kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix. // prefix.
func (s *Store) kvsListTxn(tx *txnWrapper, func (s *Store) kvsListTxn(tx *txn,
ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
// Get the table indexes. // Get the table indexes.
@ -252,7 +252,7 @@ func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMet
// kvsDeleteTxn is the inner method used to perform the actual deletion // kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction. // of a key/value pair within an existing transaction.
func (s *Store) kvsDeleteTxn(tx *txnWrapper, idx uint64, key string, entMeta *structs.EnterpriseMeta) error { func (s *Store) kvsDeleteTxn(tx *txn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error {
// Look up the entry in the state store. // Look up the entry in the state store.
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil { if err != nil {
@ -289,7 +289,7 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.Ente
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing // kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction. // transaction.
func (s *Store) kvsDeleteCASTxn(tx *txnWrapper, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) { func (s *Store) kvsDeleteCASTxn(tx *txn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Retrieve the existing kvs entry, if any exists. // Retrieve the existing kvs entry, if any exists.
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil { if err != nil {
@ -330,7 +330,7 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsSetCASTxn is the inner method used to do a CAS inside an existing // kvsSetCASTxn is the inner method used to do a CAS inside an existing
// transaction. // transaction.
func (s *Store) kvsSetCASTxn(tx *txnWrapper, idx uint64, entry *structs.DirEntry) (bool, error) { func (s *Store) kvsSetCASTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta) existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil { if err != nil {
@ -394,7 +394,7 @@ func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsLockTxn is the inner method that does a lock inside an existing // kvsLockTxn is the inner method that does a lock inside an existing
// transaction. // transaction.
func (s *Store) kvsLockTxn(tx *txnWrapper, idx uint64, entry *structs.DirEntry) (bool, error) { func (s *Store) kvsLockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present. // Verify that a session is present.
if entry.Session == "" { if entry.Session == "" {
return false, fmt.Errorf("missing session") return false, fmt.Errorf("missing session")
@ -460,7 +460,7 @@ func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsUnlockTxn is the inner method that does an unlock inside an existing // kvsUnlockTxn is the inner method that does an unlock inside an existing
// transaction. // transaction.
func (s *Store) kvsUnlockTxn(tx *txnWrapper, idx uint64, entry *structs.DirEntry) (bool, error) { func (s *Store) kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present. // Verify that a session is present.
if entry.Session == "" { if entry.Session == "" {
return false, fmt.Errorf("missing session") return false, fmt.Errorf("missing session")
@ -498,7 +498,7 @@ func (s *Store) kvsUnlockTxn(tx *txnWrapper, idx uint64, entry *structs.DirEntry
// kvsCheckSessionTxn checks to see if the given session matches the current // kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key. // entry for a key.
func (s *Store) kvsCheckSessionTxn(tx *txnWrapper, func (s *Store) kvsCheckSessionTxn(tx *txn,
key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) { key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
@ -519,7 +519,7 @@ func (s *Store) kvsCheckSessionTxn(tx *txnWrapper,
// kvsCheckIndexTxn checks to see if the given modify index matches the current // kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key. // entry for a key.
func (s *Store) kvsCheckIndexTxn(tx *txnWrapper, func (s *Store) kvsCheckIndexTxn(tx *txn,
key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) { key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)

View File

@ -16,7 +16,7 @@ func kvsIndexer() *memdb.StringFieldIndex {
} }
} }
func (s *Store) insertKVTxn(tx *txnWrapper, entry *structs.DirEntry, updateMax bool) error { func (s *Store) insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error {
if err := tx.Insert("kvs", entry); err != nil { if err := tx.Insert("kvs", entry); err != nil {
return err return err
} }
@ -33,7 +33,7 @@ func (s *Store) insertKVTxn(tx *txnWrapper, entry *structs.DirEntry, updateMax b
return nil return nil
} }
func (s *Store) kvsListEntriesTxn(tx *txnWrapper, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { func (s *Store) kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
var ents structs.DirEntries var ents structs.DirEntries
var lindex uint64 var lindex uint64
@ -56,7 +56,7 @@ func (s *Store) kvsListEntriesTxn(tx *txnWrapper, ws memdb.WatchSet, prefix stri
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an // kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction. // existing transaction.
func (s *Store) kvsDeleteTreeTxn(tx *txnWrapper, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error { func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error {
// For prefix deletes, only insert one tombstone and delete the entire subtree // For prefix deletes, only insert one tombstone and delete the entire subtree
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix) deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
if err != nil { if err != nil {
@ -77,11 +77,11 @@ func (s *Store) kvsDeleteTreeTxn(tx *txnWrapper, idx uint64, prefix string, entM
return nil return nil
} }
func kvsMaxIndex(tx *txnWrapper, entMeta *structs.EnterpriseMeta) uint64 { func kvsMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "kvs", "tombstones") return maxIndexTxn(tx, "kvs", "tombstones")
} }
func (s *Store) kvsDeleteWithEntry(tx *txnWrapper, entry *structs.DirEntry, idx uint64) error { func (s *Store) kvsDeleteWithEntry(tx *txn, entry *structs.DirEntry, idx uint64) error {
// Delete the entry and update the index. // Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil { if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err) return fmt.Errorf("failed deleting kvs entry: %s", err)

View File

@ -17,7 +17,7 @@ type changeTrackerDB struct {
// with write=true. // with write=true.
// //
// Deprecated: use either ReadTxn, or WriteTxn. // Deprecated: use either ReadTxn, or WriteTxn.
func (db *changeTrackerDB) Txn(write bool) *txnWrapper { func (db *changeTrackerDB) Txn(write bool) *txn {
if write { if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
} }
@ -26,8 +26,8 @@ func (db *changeTrackerDB) Txn(write bool) *txnWrapper {
// ReadTxn returns a read-only transaction which behaves exactly the same as // ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn // memdb.Txn
func (db *changeTrackerDB) ReadTxn() *txnWrapper { func (db *changeTrackerDB) ReadTxn() *txn {
return &txnWrapper{Txn: db.db.Txn(false)} return &txn{Txn: db.db.Txn(false)}
} }
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
@ -40,8 +40,8 @@ func (db *changeTrackerDB) ReadTxn() *txnWrapper {
// The exceptional cases are transactions that are executed on an empty // The exceptional cases are transactions that are executed on an empty
// memdb.DB as part of Restore, and those executed by tests where we insert // memdb.DB as part of Restore, and those executed by tests where we insert
// data directly into the DB. These cases may use WriteTxnRestore. // data directly into the DB. These cases may use WriteTxnRestore.
func (db *changeTrackerDB) WriteTxn(idx uint64) *txnWrapper { func (db *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txnWrapper{ t := &txn{
Txn: db.db.Txn(true), Txn: db.db.Txn(true),
Index: idx, Index: idx,
} }
@ -55,22 +55,21 @@ func (db *changeTrackerDB) WriteTxn(idx uint64) *txnWrapper {
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur // WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously // at one index - the effect is to write many values that were previously
// written across many indexes. // written across many indexes.
func (db *changeTrackerDB) WriteTxnRestore() *txnWrapper { func (db *changeTrackerDB) WriteTxnRestore() *txn {
t := &txnWrapper{ t := &txn{
Txn: db.db.Txn(true), Txn: db.db.Txn(true),
Index: 0, Index: 0,
} }
return t return t
} }
// txnWrapper wraps a memdb.Txn to capture changes and send them to the // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
// EventPublisher.
// //
// This can not be done with txn.Defer because the callback passed to Defer is // This can not be done with txn.Defer because the callback passed to Defer is
// invoked after commit completes, and because the callback can not return an // invoked after commit completes, and because the callback can not return an
// error. Any errors from the callback would be lost, which would result in a // error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed. // missing change event, even though the state store had changed.
type txnWrapper struct { type txn struct {
// Index in raft where the write is occurring. The value is zero for a // Index in raft where the write is occurring. The value is zero for a
// read-only transaction, and for a WriteTxnRestore transaction. // read-only transaction, and for a WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part // Index is stored so that it may be passed along to any subscribers as part
@ -85,9 +84,9 @@ type txnWrapper struct {
// Note that this function, unlike memdb.Txn, returns an error which must be checked // Note that this function, unlike memdb.Txn, returns an error which must be checked
// by the caller. A non-nil error indicates that a commit failed and was not // by the caller. A non-nil error indicates that a commit failed and was not
// applied. // applied.
func (tx *txnWrapper) Commit() error { func (tx *txn) Commit() error {
// changes may be empty if this is a read-only or WriteTxnRestore transaction. // changes may be empty if this is a read-only or WriteTxnRestore transaction.
// TODO: publish changes: changes := tx.Txn.Changes() // TODO(streaming): publish changes: changes := tx.Txn.Changes()
tx.Txn.Commit() tx.Txn.Commit()
return nil return nil

View File

@ -7,30 +7,30 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
func firstWithTxn(tx *txnWrapper, func firstWithTxn(tx *txn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) { table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) {
return tx.First(table, index, idxVal) return tx.First(table, index, idxVal)
} }
func firstWatchWithTxn(tx *txnWrapper, func firstWatchWithTxn(tx *txn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(table, index, idxVal) return tx.FirstWatch(table, index, idxVal)
} }
func firstWatchCompoundWithTxn(tx *txnWrapper, func firstWatchCompoundWithTxn(tx *txn,
table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) { table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(table, index, idxVals...) return tx.FirstWatch(table, index, idxVals...)
} }
func getWithTxn(tx *txnWrapper, func getWithTxn(tx *txn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(table, index, idxVal) return tx.Get(table, index, idxVal)
} }
func getCompoundWithTxn(tx *txnWrapper, table, index string, func getCompoundWithTxn(tx *txn, table, index string,
_ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) { _ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) {
return tx.Get(table, index, idxVals...) return tx.Get(table, index, idxVals...)

View File

@ -142,7 +142,7 @@ func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error
// preparedQuerySetTxn is the inner method used to insert a prepared query with // preparedQuerySetTxn is the inner method used to insert a prepared query with
// the proper indexes into the state store. // the proper indexes into the state store.
func (s *Store) preparedQuerySetTxn(tx *txnWrapper, idx uint64, query *structs.PreparedQuery) error { func (s *Store) preparedQuerySetTxn(tx *txn, idx uint64, query *structs.PreparedQuery) error {
// Check that the ID is set. // Check that the ID is set.
if query.ID == "" { if query.ID == "" {
return ErrMissingQueryID return ErrMissingQueryID
@ -258,7 +258,7 @@ func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error {
// preparedQueryDeleteTxn is the inner method used to delete a prepared query // preparedQueryDeleteTxn is the inner method used to delete a prepared query
// with the proper indexes into the state store. // with the proper indexes into the state store.
func (s *Store) preparedQueryDeleteTxn(tx *txnWrapper, idx uint64, queryID string) error { func (s *Store) preparedQueryDeleteTxn(tx *txn, idx uint64, queryID string) error {
// Pull the query. // Pull the query.
wrapped, err := tx.First("prepared-queries", "id", queryID) wrapped, err := tx.First("prepared-queries", "id", queryID)
if err != nil { if err != nil {

View File

@ -176,7 +176,7 @@ func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
// sessionCreateTxn is the inner method used for creating session entries in // sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be // an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered. // checked for failing status. Returns any error encountered.
func (s *Store) sessionCreateTxn(tx *txnWrapper, idx uint64, sess *structs.Session) error { func (s *Store) sessionCreateTxn(tx *txn, idx uint64, sess *structs.Session) error {
// Check that we have a session ID // Check that we have a session ID
if sess.ID == "" { if sess.ID == "" {
return ErrMissingSessionID return ErrMissingSessionID
@ -301,7 +301,7 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En
// deleteSessionTxn is the inner method, which is used to do the actual // deleteSessionTxn is the inner method, which is used to do the actual
// session deletion and handle session invalidation, etc. // session deletion and handle session invalidation, etc.
func (s *Store) deleteSessionTxn(tx *txnWrapper, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteSessionTxn(tx *txn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error {
// Look up the session. // Look up the session.
sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta) sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta)
if err != nil { if err != nil {

View File

@ -35,7 +35,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex {
} }
} }
func (s *Store) sessionDeleteWithSession(tx *txnWrapper, session *structs.Session, idx uint64) error { func (s *Store) sessionDeleteWithSession(tx *txn, session *structs.Session, idx uint64) error {
if err := tx.Delete("sessions", session); err != nil { if err := tx.Delete("sessions", session); err != nil {
return fmt.Errorf("failed deleting session: %s", err) return fmt.Errorf("failed deleting session: %s", err)
} }
@ -48,7 +48,7 @@ func (s *Store) sessionDeleteWithSession(tx *txnWrapper, session *structs.Sessio
return nil return nil
} }
func (s *Store) insertSessionTxn(tx *txnWrapper, session *structs.Session, idx uint64, updateMax bool) error { func (s *Store) insertSessionTxn(tx *txn, session *structs.Session, idx uint64, updateMax bool) error {
if err := tx.Insert("sessions", session); err != nil { if err := tx.Insert("sessions", session); err != nil {
return err return err
} }
@ -80,11 +80,11 @@ func (s *Store) insertSessionTxn(tx *txnWrapper, session *structs.Session, idx u
return nil return nil
} }
func (s *Store) allNodeSessionsTxn(tx *txnWrapper, node string) (structs.Sessions, error) { func (s *Store) allNodeSessionsTxn(tx *txn, node string) (structs.Sessions, error) {
return s.nodeSessionsTxn(tx, nil, node, nil) return s.nodeSessionsTxn(tx, nil, node, nil)
} }
func (s *Store) nodeSessionsTxn(tx *txnWrapper, func (s *Store) nodeSessionsTxn(tx *txn,
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) { ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
sessions, err := tx.Get("sessions", "node", node) sessions, err := tx.Get("sessions", "node", node)
@ -100,11 +100,11 @@ func (s *Store) nodeSessionsTxn(tx *txnWrapper,
return result, nil return result, nil
} }
func (s *Store) sessionMaxIndex(tx *txnWrapper, entMeta *structs.EnterpriseMeta) uint64 { func (s *Store) sessionMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "sessions") return maxIndexTxn(tx, "sessions")
} }
func (s *Store) validateSessionChecksTxn(tx *txnWrapper, session *structs.Session) error { func (s *Store) validateSessionChecksTxn(tx *txn, session *structs.Session) error {
// Go over the session checks and ensure they exist. // Go over the session checks and ensure they exist.
for _, checkID := range session.CheckIDs() { for _, checkID := range session.CheckIDs() {
check, err := tx.First("checks", "id", session.Node, string(checkID)) check, err := tx.First("checks", "id", session.Node, string(checkID))

View File

@ -115,7 +115,7 @@ type Store struct {
// works by starting a read transaction against the whole state store. // works by starting a read transaction against the whole state store.
type Snapshot struct { type Snapshot struct {
store *Store store *Store
tx *txnWrapper tx *txn
lastIndex uint64 lastIndex uint64
} }
@ -123,7 +123,7 @@ type Snapshot struct {
// data to a state store. // data to a state store.
type Restore struct { type Restore struct {
store *Store store *Store
tx *txnWrapper tx *txn
} }
// IndexEntry keeps a record of the last index per-table. // IndexEntry keeps a record of the last index per-table.
@ -247,11 +247,11 @@ func (s *Store) maxIndex(tables ...string) uint64 {
// maxIndexTxn is a helper used to retrieve the highest known index // maxIndexTxn is a helper used to retrieve the highest known index
// amongst a set of tables in the db. // amongst a set of tables in the db.
func maxIndexTxn(tx *txnWrapper, tables ...string) uint64 { func maxIndexTxn(tx *txn, tables ...string) uint64 {
return maxIndexWatchTxn(tx, nil, tables...) return maxIndexWatchTxn(tx, nil, tables...)
} }
func maxIndexWatchTxn(tx *txnWrapper, ws memdb.WatchSet, tables ...string) uint64 { func maxIndexWatchTxn(tx *txn, ws memdb.WatchSet, tables ...string) uint64 {
var lindex uint64 var lindex uint64
for _, table := range tables { for _, table := range tables {
ch, ti, err := tx.FirstWatch("index", "id", table) ch, ti, err := tx.FirstWatch("index", "id", table)
@ -268,7 +268,7 @@ func maxIndexWatchTxn(tx *txnWrapper, ws memdb.WatchSet, tables ...string) uint6
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to // indexUpdateMaxTxn is used when restoring entries and sets the table's index to
// the given idx only if it's greater than the current index. // the given idx only if it's greater than the current index.
func indexUpdateMaxTxn(tx *txnWrapper, idx uint64, table string) error { func indexUpdateMaxTxn(tx *txn, idx uint64, table string) error {
ti, err := tx.First("index", "id", table) ti, err := tx.First("index", "id", table)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve existing index: %s", err) return fmt.Errorf("failed to retrieve existing index: %s", err)

View File

@ -8,7 +8,7 @@ import (
) )
// txnKVS handles all KV-related operations. // txnKVS handles all KV-related operations.
func (s *Store) txnKVS(tx *txnWrapper, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry var entry *structs.DirEntry
var err error var err error
@ -110,7 +110,7 @@ func (s *Store) txnKVS(tx *txnWrapper, idx uint64, op *structs.TxnKVOp) (structs
} }
// txnSession handles all Session-related operations. // txnSession handles all Session-related operations.
func (s *Store) txnSession(tx *txnWrapper, idx uint64, op *structs.TxnSessionOp) error { func (s *Store) txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error {
var err error var err error
switch op.Verb { switch op.Verb {
@ -127,7 +127,7 @@ func (s *Store) txnSession(tx *txnWrapper, idx uint64, op *structs.TxnSessionOp)
} }
// txnIntention handles all Intention-related operations. // txnIntention handles all Intention-related operations.
func (s *Store) txnIntention(tx *txnWrapper, idx uint64, op *structs.TxnIntentionOp) error { func (s *Store) txnIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error {
switch op.Op { switch op.Op {
case structs.IntentionOpCreate, structs.IntentionOpUpdate: case structs.IntentionOpCreate, structs.IntentionOpUpdate:
return s.intentionSetTxn(tx, idx, op.Intention) return s.intentionSetTxn(tx, idx, op.Intention)
@ -139,7 +139,7 @@ func (s *Store) txnIntention(tx *txnWrapper, idx uint64, op *structs.TxnIntentio
} }
// txnNode handles all Node-related operations. // txnNode handles all Node-related operations.
func (s *Store) txnNode(tx *txnWrapper, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) { func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
var entry *structs.Node var entry *structs.Node
var err error var err error
@ -208,7 +208,7 @@ func (s *Store) txnNode(tx *txnWrapper, idx uint64, op *structs.TxnNodeOp) (stru
} }
// txnService handles all Service-related operations. // txnService handles all Service-related operations.
func (s *Store) txnService(tx *txnWrapper, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) { func (s *Store) txnService(tx *txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
switch op.Verb { switch op.Verb {
case api.ServiceGet: case api.ServiceGet:
entry, err := s.getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta) entry, err := s.getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta)
@ -270,7 +270,7 @@ func newTxnResultFromNodeServiceEntry(entry *structs.NodeService) structs.TxnRes
} }
// txnCheck handles all Check-related operations. // txnCheck handles all Check-related operations.
func (s *Store) txnCheck(tx *txnWrapper, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
var entry *structs.HealthCheck var entry *structs.HealthCheck
var err error var err error
@ -332,7 +332,7 @@ func (s *Store) txnCheck(tx *txnWrapper, idx uint64, op *structs.TxnCheckOp) (st
} }
// txnDispatch runs the given operations inside the state store transaction. // txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx *txnWrapper, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { func (s *Store) txnDispatch(tx *txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops)) results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops)) errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops { for i, op := range ops {