state: use ReadTxn and WriteTxn interface

Instead of *txn, so that we can replace the txn implementation with others, and so
that the function is easily documented as a read or write function.
This commit is contained in:
Daniel Nephin 2021-03-08 17:41:10 -05:00
parent af5c8e6243
commit 9514698b10
10 changed files with 44 additions and 43 deletions

View File

@ -343,7 +343,7 @@ func fixupACLLinks(tx ReadTxn, original []pbacl.ACLLink, getName func(ReadTxn, s
return links, owned, nil
}
func resolveTokenPolicyLinks(tx *txn, token *structs.ACLToken, allowMissing bool) (int, error) {
func resolveTokenPolicyLinks(tx ReadTxn, token *structs.ACLToken, allowMissing bool) (int, error) {
var numValid int
for linkIndex, link := range token.Policies {
if link.ID != "" {
@ -417,7 +417,7 @@ func fixupTokenPolicyLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACL
return token, nil
}
func resolveTokenRoleLinks(tx *txn, token *structs.ACLToken, allowMissing bool) (int, error) {
func resolveTokenRoleLinks(tx ReadTxn, token *structs.ACLToken, allowMissing bool) (int, error) {
var numValid int
for linkIndex, link := range token.Roles {
if link.ID != "" {
@ -491,7 +491,7 @@ func fixupTokenRoleLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLTo
return token, nil
}
func resolveRolePolicyLinks(tx *txn, role *structs.ACLRole, allowMissing bool) error {
func resolveRolePolicyLinks(tx ReadTxn, role *structs.ACLRole, allowMissing bool) error {
for linkIndex, link := range role.Policies {
if link.ID == "" {
return fmt.Errorf("Encountered a Role with policies linked by Name in the state store")
@ -602,7 +602,7 @@ func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, opts ACLT
// aclTokenSetTxn is the inner method used to insert an ACL token with the
// proper indexes into the state store.
func aclTokenSetTxn(tx *txn, idx uint64, token *structs.ACLToken, opts ACLTokenSetOptions) error {
func aclTokenSetTxn(tx WriteTxn, idx uint64, token *structs.ACLToken, opts ACLTokenSetOptions) error {
// Check that the ID is set
if token.SecretID == "" {
return ErrMissingACLTokenSecret
@ -993,7 +993,7 @@ func (s *Store) aclTokenDelete(idx uint64, value, index string, entMeta *structs
return tx.Commit()
}
func aclTokenDeleteTxn(tx *txn, idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error {
func aclTokenDeleteTxn(tx WriteTxn, idx uint64, value, index string, entMeta *structs.EnterpriseMeta) error {
// Look up the existing token
_, token, err := aclTokenGetFromIndex(tx, value, index, entMeta)
if err != nil {
@ -1011,7 +1011,7 @@ func aclTokenDeleteTxn(tx *txn, idx uint64, value, index string, entMeta *struct
return aclTokenDeleteWithToken(tx, token.(*structs.ACLToken), idx)
}
func aclTokenDeleteAllForAuthMethodTxn(tx *txn, idx uint64, methodName string, methodGlobalLocality bool, methodMeta *structs.EnterpriseMeta) error {
func aclTokenDeleteAllForAuthMethodTxn(tx WriteTxn, idx uint64, methodName string, methodGlobalLocality bool, methodMeta *structs.EnterpriseMeta) error {
// collect all the tokens linked with the given auth method.
iter, err := aclTokenListByAuthMethod(tx, methodName, methodMeta, structs.WildcardEnterpriseMeta())
if err != nil {
@ -1068,7 +1068,7 @@ func (s *Store) ACLPolicySet(idx uint64, policy *structs.ACLPolicy) error {
return tx.Commit()
}
func aclPolicySetTxn(tx *txn, idx uint64, policy *structs.ACLPolicy) error {
func aclPolicySetTxn(tx WriteTxn, idx uint64, policy *structs.ACLPolicy) error {
// Check that the ID is set
if policy.ID == "" {
return ErrMissingACLPolicyID
@ -1254,7 +1254,7 @@ func (s *Store) aclPolicyDelete(idx uint64, value string, fn aclPolicyGetFn, ent
return tx.Commit()
}
func aclPolicyDeleteTxn(tx *txn, idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error {
func aclPolicyDeleteTxn(tx WriteTxn, idx uint64, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) error {
// Look up the existing token
_, rawPolicy, err := fn(tx, value, entMeta)
if err != nil {
@ -1298,7 +1298,7 @@ func (s *Store) ACLRoleSet(idx uint64, role *structs.ACLRole) error {
return tx.Commit()
}
func aclRoleSetTxn(tx *txn, idx uint64, role *structs.ACLRole, allowMissing bool) error {
func aclRoleSetTxn(tx WriteTxn, idx uint64, role *structs.ACLRole, allowMissing bool) error {
// Check that the ID is set
if role.ID == "" {
return ErrMissingACLRoleID
@ -1507,7 +1507,7 @@ func (s *Store) aclRoleDelete(idx uint64, value string, fn aclRoleGetFn, entMeta
return tx.Commit()
}
func aclRoleDeleteTxn(tx *txn, idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error {
func aclRoleDeleteTxn(tx WriteTxn, idx uint64, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) error {
// Look up the existing role
_, rawRole, err := fn(tx, value, entMeta)
if err != nil {
@ -1546,7 +1546,7 @@ func (s *Store) ACLBindingRuleSet(idx uint64, rule *structs.ACLBindingRule) erro
return tx.Commit()
}
func aclBindingRuleSetTxn(tx *txn, idx uint64, rule *structs.ACLBindingRule) error {
func aclBindingRuleSetTxn(tx WriteTxn, idx uint64, rule *structs.ACLBindingRule) error {
// Check that the ID and AuthMethod are set
if rule.ID == "" {
return ErrMissingACLBindingRuleID
@ -1662,7 +1662,7 @@ func (s *Store) aclBindingRuleDelete(idx uint64, id string, entMeta *structs.Ent
return tx.Commit()
}
func aclBindingRuleDeleteTxn(tx *txn, idx uint64, id string, entMeta *structs.EnterpriseMeta) error {
func aclBindingRuleDeleteTxn(tx WriteTxn, idx uint64, id string, entMeta *structs.EnterpriseMeta) error {
// Look up the existing binding rule
_, rawRule, err := aclBindingRuleGetByID(tx, id, entMeta)
if err != nil {
@ -1681,7 +1681,7 @@ func aclBindingRuleDeleteTxn(tx *txn, idx uint64, id string, entMeta *structs.En
return nil
}
func aclBindingRuleDeleteAllForAuthMethodTxn(tx *txn, idx uint64, methodName string, entMeta *structs.EnterpriseMeta) error {
func aclBindingRuleDeleteAllForAuthMethodTxn(tx WriteTxn, idx uint64, methodName string, entMeta *structs.EnterpriseMeta) error {
// collect them all
iter, err := aclBindingRuleListByAuthMethod(tx, methodName, entMeta)
if err != nil {
@ -1731,7 +1731,7 @@ func (s *Store) ACLAuthMethodSet(idx uint64, method *structs.ACLAuthMethod) erro
return tx.Commit()
}
func aclAuthMethodSetTxn(tx *txn, idx uint64, method *structs.ACLAuthMethod) error {
func aclAuthMethodSetTxn(tx WriteTxn, idx uint64, method *structs.ACLAuthMethod) error {
// Check that the Name and Type are set
if method.Name == "" {
return ErrMissingACLAuthMethodName
@ -1846,7 +1846,7 @@ func (s *Store) aclAuthMethodDelete(idx uint64, name string, entMeta *structs.En
return tx.Commit()
}
func aclAuthMethodDeleteTxn(tx *txn, idx uint64, name string, entMeta *structs.EnterpriseMeta) error {
func aclAuthMethodDeleteTxn(tx WriteTxn, idx uint64, name string, entMeta *structs.EnterpriseMeta) error {
// Look up the existing method
_, rawMethod, err := aclAuthMethodGetByName(tx, name, entMeta)
if err != nil {

View File

@ -11,7 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
func aclPolicyInsert(tx *txn, policy *structs.ACLPolicy) error {
func aclPolicyInsert(tx WriteTxn, policy *structs.ACLPolicy) error {
if err := tx.Insert(tableACLPolicies, policy); err != nil {
return fmt.Errorf("failed inserting acl policy: %v", err)
}
@ -80,7 +80,7 @@ func aclPolicyGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan
return tx.FirstWatch(tableACLPolicies, indexID, id)
}
func aclPolicyDeleteWithPolicy(tx *txn, policy *structs.ACLPolicy, idx uint64) error {
func aclPolicyDeleteWithPolicy(tx WriteTxn, policy *structs.ACLPolicy, idx uint64) error {
// remove the policy
if err := tx.Delete(tableACLPolicies, policy); err != nil {
return fmt.Errorf("failed deleting acl policy: %v", err)
@ -97,7 +97,7 @@ func aclPolicyMaxIndex(tx ReadTxn, _ *structs.ACLPolicy, _ *structs.EnterpriseMe
return maxIndexTxn(tx, tableACLPolicies)
}
func aclPolicyUpsertValidateEnterprise(*txn, *structs.ACLPolicy, *structs.ACLPolicy) error {
func aclPolicyUpsertValidateEnterprise(ReadTxn, *structs.ACLPolicy, *structs.ACLPolicy) error {
return nil
}
@ -109,7 +109,7 @@ func (s *Store) ACLPolicyUpsertValidateEnterprise(*structs.ACLPolicy, *structs.A
///// ACL Token Functions /////
///////////////////////////////////////////////////////////////////////////////
func aclTokenInsert(tx *txn, token *structs.ACLToken) error {
func aclTokenInsert(tx WriteTxn, token *structs.ACLToken) error {
// insert the token into memdb
if err := tx.Insert("acl-tokens", token); err != nil {
return fmt.Errorf("failed inserting acl token: %v", err)
@ -151,7 +151,7 @@ func aclTokenListByAuthMethod(tx ReadTxn, authMethod string, _, _ *structs.Enter
return tx.Get("acl-tokens", "authmethod", authMethod)
}
func aclTokenDeleteWithToken(tx *txn, token *structs.ACLToken, idx uint64) error {
func aclTokenDeleteWithToken(tx WriteTxn, token *structs.ACLToken, idx uint64) error {
// remove the token
if err := tx.Delete("acl-tokens", token); err != nil {
return fmt.Errorf("failed deleting acl token: %v", err)
@ -168,7 +168,7 @@ func aclTokenMaxIndex(tx ReadTxn, _ *structs.ACLToken, entMeta *structs.Enterpri
return maxIndexTxn(tx, "acl-tokens")
}
func aclTokenUpsertValidateEnterprise(tx *txn, token *structs.ACLToken, existing *structs.ACLToken) error {
func aclTokenUpsertValidateEnterprise(tx WriteTxn, token *structs.ACLToken, existing *structs.ACLToken) error {
return nil
}
@ -180,7 +180,7 @@ func (s *Store) ACLTokenUpsertValidateEnterprise(token *structs.ACLToken, existi
///// ACL Role Functions /////
///////////////////////////////////////////////////////////////////////////////
func aclRoleInsert(tx *txn, role *structs.ACLRole) error {
func aclRoleInsert(tx WriteTxn, role *structs.ACLRole) error {
// insert the role into memdb
if err := tx.Insert(tableACLRoles, role); err != nil {
return fmt.Errorf("failed inserting acl role: %v", err)
@ -197,7 +197,7 @@ func aclRoleGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan st
return tx.FirstWatch(tableACLRoles, indexID, id)
}
func aclRoleDeleteWithRole(tx *txn, role *structs.ACLRole, idx uint64) error {
func aclRoleDeleteWithRole(tx WriteTxn, role *structs.ACLRole, idx uint64) error {
// remove the role
if err := tx.Delete(tableACLRoles, role); err != nil {
return fmt.Errorf("failed deleting acl role: %v", err)
@ -214,7 +214,7 @@ func aclRoleMaxIndex(tx ReadTxn, _ *structs.ACLRole, _ *structs.EnterpriseMeta)
return maxIndexTxn(tx, tableACLRoles)
}
func aclRoleUpsertValidateEnterprise(tx *txn, role *structs.ACLRole, existing *structs.ACLRole) error {
func aclRoleUpsertValidateEnterprise(tx WriteTxn, role *structs.ACLRole, existing *structs.ACLRole) error {
return nil
}
@ -226,7 +226,7 @@ func (s *Store) ACLRoleUpsertValidateEnterprise(role *structs.ACLRole, existing
///// ACL Binding Rule Functions /////
///////////////////////////////////////////////////////////////////////////////
func aclBindingRuleInsert(tx *txn, rule *structs.ACLBindingRule) error {
func aclBindingRuleInsert(tx WriteTxn, rule *structs.ACLBindingRule) error {
// insert the role into memdb
if err := tx.Insert("acl-binding-rules", rule); err != nil {
return fmt.Errorf("failed inserting acl role: %v", err)
@ -252,7 +252,7 @@ func aclBindingRuleListByAuthMethod(tx ReadTxn, method string, _ *structs.Enterp
return tx.Get("acl-binding-rules", "authmethod", method)
}
func aclBindingRuleDeleteWithRule(tx *txn, rule *structs.ACLBindingRule, idx uint64) error {
func aclBindingRuleDeleteWithRule(tx WriteTxn, rule *structs.ACLBindingRule, idx uint64) error {
// remove the rule
if err := tx.Delete("acl-binding-rules", rule); err != nil {
return fmt.Errorf("failed deleting acl binding rule: %v", err)
@ -269,7 +269,7 @@ func aclBindingRuleMaxIndex(tx ReadTxn, _ *structs.ACLBindingRule, entMeta *stru
return maxIndexTxn(tx, "acl-binding-rules")
}
func aclBindingRuleUpsertValidateEnterprise(tx *txn, rule *structs.ACLBindingRule, existing *structs.ACLBindingRule) error {
func aclBindingRuleUpsertValidateEnterprise(tx ReadTxn, rule *structs.ACLBindingRule, existing *structs.ACLBindingRule) error {
return nil
}
@ -281,7 +281,7 @@ func (s *Store) ACLBindingRuleUpsertValidateEnterprise(rule *structs.ACLBindingR
///// ACL Auth Method Functions /////
///////////////////////////////////////////////////////////////////////////////
func aclAuthMethodInsert(tx *txn, method *structs.ACLAuthMethod) error {
func aclAuthMethodInsert(tx WriteTxn, method *structs.ACLAuthMethod) error {
// insert the role into memdb
if err := tx.Insert("acl-auth-methods", method); err != nil {
return fmt.Errorf("failed inserting acl role: %v", err)
@ -303,7 +303,7 @@ func aclAuthMethodList(tx ReadTxn, entMeta *structs.EnterpriseMeta) (memdb.Resul
return tx.Get("acl-auth-methods", "id")
}
func aclAuthMethodDeleteWithMethod(tx *txn, method *structs.ACLAuthMethod, idx uint64) error {
func aclAuthMethodDeleteWithMethod(tx WriteTxn, method *structs.ACLAuthMethod, idx uint64) error {
// remove the method
if err := tx.Delete("acl-auth-methods", method); err != nil {
return fmt.Errorf("failed deleting acl auth method: %v", err)
@ -320,7 +320,7 @@ func aclAuthMethodMaxIndex(tx ReadTxn, _ *structs.ACLAuthMethod, entMeta *struct
return maxIndexTxn(tx, "acl-auth-methods")
}
func aclAuthMethodUpsertValidateEnterprise(tx *txn, method *structs.ACLAuthMethod, existing *structs.ACLAuthMethod) error {
func aclAuthMethodUpsertValidateEnterprise(_ ReadTxn, method *structs.ACLAuthMethod, existing *structs.ACLAuthMethod) error {
return nil
}

View File

@ -110,7 +110,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotCo
return err == nil, err
}
func autopilotSetConfigTxn(tx *txn, idx uint64, config *structs.AutopilotConfig) error {
func autopilotSetConfigTxn(tx WriteTxn, idx uint64, config *structs.AutopilotConfig) error {
// Check for an existing config
existing, err := tx.First("autopilot-config", "id")
if err != nil {

View File

@ -169,7 +169,7 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur
return err == nil, err
}
func (s *Store) caSetConfigTxn(idx uint64, tx *txn, config *structs.CAConfiguration) error {
func (s *Store) caSetConfigTxn(idx uint64, tx WriteTxn, config *structs.CAConfiguration) error {
// Check for an existing config
prev, err := tx.First(tableConnectCAConfig, "id")
if err != nil {

View File

@ -81,7 +81,7 @@ func (s *Store) FederationStateSet(idx uint64, config *structs.FederationState)
}
// federationStateSetTxn upserts a federation state inside of a transaction.
func federationStateSetTxn(tx *txn, idx uint64, config *structs.FederationState) error {
func federationStateSetTxn(tx WriteTxn, idx uint64, config *structs.FederationState) error {
if config.Datacenter == "" {
return fmt.Errorf("missing datacenter on federation state")
}
@ -202,7 +202,7 @@ func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) err
return tx.Commit()
}
func federationStateDeleteTxn(tx *txn, idx uint64, datacenter string) error {
func federationStateDeleteTxn(tx WriteTxn, idx uint64, datacenter string) error {
// Try to retrieve the existing federation state.
existing, err := tx.First(tableFederationStates, "id", datacenter)
if err != nil {

View File

@ -3,8 +3,9 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
)
// Tombstone is the internal type used to track tombstones.
@ -79,7 +80,7 @@ func (g *Graveyard) DumpTxn(tx ReadTxn) (memdb.ResultIterator, error) {
// RestoreTxn is used when restoring from a snapshot. For general inserts, use
// InsertTxn.
func (g *Graveyard) RestoreTxn(tx *txn, stone *Tombstone) error {
func (g *Graveyard) RestoreTxn(tx WriteTxn, stone *Tombstone) error {
if err := g.insertTombstoneWithTxn(tx, "tombstones", stone, true); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
@ -89,7 +90,7 @@ func (g *Graveyard) RestoreTxn(tx *txn, stone *Tombstone) error {
// ReapTxn cleans out all tombstones whose index values are less than or equal
// to the given idx. This prevents unbounded storage growth of the tombstones.
func (g *Graveyard) ReapTxn(tx *txn, idx uint64) error {
func (g *Graveyard) ReapTxn(tx WriteTxn, idx uint64) error {
// This does a full table scan since we currently can't index on a
// numeric value. Since this is all in-memory and done infrequently
// this pretty reasonable.

View File

@ -139,7 +139,7 @@ func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error
// preparedQuerySetTxn is the inner method used to insert a prepared query with
// the proper indexes into the state store.
func preparedQuerySetTxn(tx *txn, idx uint64, query *structs.PreparedQuery) error {
func preparedQuerySetTxn(tx WriteTxn, idx uint64, query *structs.PreparedQuery) error {
// Check that the ID is set.
if query.ID == "" {
return ErrMissingQueryID

View File

@ -172,7 +172,7 @@ func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
// sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered.
func sessionCreateTxn(tx *txn, idx uint64, sess *structs.Session) error {
func sessionCreateTxn(tx WriteTxn, idx uint64, sess *structs.Session) error {
// Check that we have a session ID
if sess.ID == "" {
return ErrMissingSessionID

View File

@ -49,7 +49,7 @@ func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64)
return nil
}
func insertSessionTxn(tx *txn, session *structs.Session, idx uint64, updateMax bool, _ bool) error {
func insertSessionTxn(tx WriteTxn, session *structs.Session, idx uint64, updateMax bool, _ bool) error {
if err := tx.Insert("sessions", session); err != nil {
return err
}
@ -105,7 +105,7 @@ func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "sessions")
}
func validateSessionChecksTxn(tx *txn, session *structs.Session) error {
func validateSessionChecksTxn(tx ReadTxn, session *structs.Session) error {
// Go over the session checks and ensure they exist.
for _, checkID := range session.CheckIDs() {
check, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: session.Node, CheckID: string(checkID)})

View File

@ -68,7 +68,7 @@ func (s *Store) SystemMetadataSet(idx uint64, entry *structs.SystemMetadataEntry
}
// systemMetadataSetTxn upserts a system metadata inside of a transaction.
func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntry) error {
func systemMetadataSetTxn(tx WriteTxn, idx uint64, entry *structs.SystemMetadataEntry) error {
// The only validation we care about is non-empty keys.
if entry.Key == "" {
return fmt.Errorf("missing key on system metadata")
@ -170,7 +170,7 @@ func (s *Store) SystemMetadataDelete(idx uint64, entry *structs.SystemMetadataEn
return tx.Commit()
}
func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error {
func systemMetadataDeleteTxn(tx WriteTxn, idx uint64, key string) error {
// Try to retrieve the existing system metadata.
existing, err := tx.First(tableSystemMetadata, "id", key)
if err != nil {