Merge pull request #8618 from hashicorp/dnephin/remove-txn-readtxn

state: Use ReadTxn everywhere
This commit is contained in:
Daniel Nephin 2020-10-28 12:32:47 -04:00 committed by GitHub
commit 477d665309
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 171 additions and 175 deletions

View File

@ -434,7 +434,7 @@ func resolveTokenPolicyLinks(tx *txn, token *structs.ACLToken, allowMissing bool
// stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated // stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated
// token only when fixes are needed. If the policy links are still accurate then we just return the original // token only when fixes are needed. If the policy links are still accurate then we just return the original
// token. // token.
func fixupTokenPolicyLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) { func fixupTokenPolicyLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLToken, error) {
owned := false owned := false
token := original token := original
@ -508,7 +508,7 @@ func resolveTokenRoleLinks(tx *txn, token *structs.ACLToken, allowMissing bool)
// stale when a linked role was deleted or renamed. This will correct them and generate a newly allocated // stale when a linked role was deleted or renamed. This will correct them and generate a newly allocated
// token only when fixes are needed. If the role links are still accurate then we just return the original // token only when fixes are needed. If the role links are still accurate then we just return the original
// token. // token.
func fixupTokenRoleLinks(tx *txn, original *structs.ACLToken) (*structs.ACLToken, error) { func fixupTokenRoleLinks(tx ReadTxn, original *structs.ACLToken) (*structs.ACLToken, error) {
owned := false owned := false
token := original token := original
@ -824,7 +824,7 @@ func (s *Store) ACLTokenBatchGet(ws memdb.WatchSet, accessors []string) (uint64,
return idx, tokens, nil return idx, tokens, nil
} }
func aclTokenGetTxn(tx *txn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) { func aclTokenGetTxn(tx ReadTxn, ws memdb.WatchSet, value, index string, entMeta *structs.EnterpriseMeta) (*structs.ACLToken, error) {
watchCh, rawToken, err := aclTokenGetFromIndex(tx, value, index, entMeta) watchCh, rawToken, err := aclTokenGetFromIndex(tx, value, index, entMeta)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed acl token lookup: %v", err) return nil, fmt.Errorf("failed acl token lookup: %v", err)
@ -1794,7 +1794,7 @@ func (s *Store) aclAuthMethodGet(ws memdb.WatchSet, name string, entMeta *struct
return idx, method, nil return idx, method, nil
} }
func getAuthMethodWithTxn(tx *txn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) { func getAuthMethodWithTxn(tx ReadTxn, ws memdb.WatchSet, name string, entMeta *structs.EnterpriseMeta) (*structs.ACLAuthMethod, error) {
watchCh, rawMethod, err := aclAuthMethodGetByName(tx, name, entMeta) watchCh, rawMethod, err := aclAuthMethodGetByName(tx, name, entMeta)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed acl auth method lookup: %v", err) return nil, fmt.Errorf("failed acl auth method lookup: %v", err)

View File

@ -243,7 +243,7 @@ func aclPolicyDeleteWithPolicy(tx *txn, policy *structs.ACLPolicy, idx uint64) e
return nil return nil
} }
func aclPolicyMaxIndex(tx *txn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 { func aclPolicyMaxIndex(tx ReadTxn, _ *structs.ACLPolicy, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-policies") return maxIndexTxn(tx, "acl-policies")
} }
@ -273,19 +273,19 @@ func aclTokenInsert(tx *txn, token *structs.ACLToken) error {
return nil return nil
} }
func aclTokenGetFromIndex(tx *txn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func aclTokenGetFromIndex(tx ReadTxn, id string, index string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-tokens", index, id) return tx.FirstWatch("acl-tokens", index, id)
} }
func aclTokenListAll(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclTokenListAll(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "id") return tx.Get("acl-tokens", "id")
} }
func aclTokenListLocal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclTokenListLocal(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "local", true) return tx.Get("acl-tokens", "local", true)
} }
func aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclTokenListGlobal(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "local", false) return tx.Get("acl-tokens", "local", false)
} }
@ -297,7 +297,7 @@ func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (mem
return tx.Get("acl-tokens", "roles", role) return tx.Get("acl-tokens", "roles", role)
} }
func aclTokenListByAuthMethod(tx *txn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclTokenListByAuthMethod(tx ReadTxn, authMethod string, _, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "authmethod", authMethod) return tx.Get("acl-tokens", "authmethod", authMethod)
} }
@ -314,7 +314,7 @@ func aclTokenDeleteWithToken(tx *txn, token *structs.ACLToken, idx uint64) error
return nil return nil
} }
func aclTokenMaxIndex(tx *txn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 { func aclTokenMaxIndex(tx ReadTxn, _ *structs.ACLToken, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-tokens") return maxIndexTxn(tx, "acl-tokens")
} }
@ -372,7 +372,7 @@ func aclRoleDeleteWithRole(tx *txn, role *structs.ACLRole, idx uint64) error {
return nil return nil
} }
func aclRoleMaxIndex(tx *txn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 { func aclRoleMaxIndex(tx ReadTxn, _ *structs.ACLRole, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-roles") return maxIndexTxn(tx, "acl-roles")
} }
@ -402,15 +402,15 @@ func aclBindingRuleInsert(tx *txn, rule *structs.ACLBindingRule) error {
return nil return nil
} }
func aclBindingRuleGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func aclBindingRuleGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-binding-rules", "id", id) return tx.FirstWatch("acl-binding-rules", "id", id)
} }
func aclBindingRuleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclBindingRuleList(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-binding-rules", "id") return tx.Get("acl-binding-rules", "id")
} }
func aclBindingRuleListByAuthMethod(tx *txn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclBindingRuleListByAuthMethod(tx ReadTxn, method string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-binding-rules", "authmethod", method) return tx.Get("acl-binding-rules", "authmethod", method)
} }
@ -427,7 +427,7 @@ func aclBindingRuleDeleteWithRule(tx *txn, rule *structs.ACLBindingRule, idx uin
return nil return nil
} }
func aclBindingRuleMaxIndex(tx *txn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 { func aclBindingRuleMaxIndex(tx ReadTxn, _ *structs.ACLBindingRule, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-binding-rules") return maxIndexTxn(tx, "acl-binding-rules")
} }
@ -457,11 +457,11 @@ func aclAuthMethodInsert(tx *txn, method *structs.ACLAuthMethod) error {
return nil return nil
} }
func aclAuthMethodGetByName(tx *txn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func aclAuthMethodGetByName(tx ReadTxn, method string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("acl-auth-methods", "id", method) return tx.FirstWatch("acl-auth-methods", "id", method)
} }
func aclAuthMethodList(tx *txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func aclAuthMethodList(tx ReadTxn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-auth-methods", "id") return tx.Get("acl-auth-methods", "id")
} }
@ -478,7 +478,7 @@ func aclAuthMethodDeleteWithMethod(tx *txn, method *structs.ACLAuthMethod, idx u
return nil return nil
} }
func aclAuthMethodMaxIndex(tx *txn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 { func aclAuthMethodMaxIndex(tx ReadTxn, _ *structs.ACLAuthMethod, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "acl-auth-methods") return maxIndexTxn(tx, "acl-auth-methods")
} }

View File

@ -6,15 +6,16 @@ import (
"reflect" "reflect"
"strings" "strings"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
) )
const ( const (
@ -262,10 +263,7 @@ func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
// performed within a single transaction to avoid race conditions on state // performed within a single transaction to avoid race conditions on state
// updates. // updates.
func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error { func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
if err := s.store.ensureRegistrationTxn(s.tx, idx, true, req); err != nil { return s.store.ensureRegistrationTxn(s.tx, idx, true, req)
return err
}
return nil
} }
// EnsureRegistration is used to make sure a node, service, and check // EnsureRegistration is used to make sure a node, service, and check
@ -282,7 +280,7 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
return tx.Commit() return tx.Commit()
} }
func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error { func (s *Store) ensureCheckIfNodeMatches(tx WriteTxn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
if check.Node != node { if check.Node != node {
return fmt.Errorf("check node %q does not match node %q", return fmt.Errorf("check node %q does not match node %q",
check.Node, node) check.Node, node)
@ -296,7 +294,7 @@ func (s *Store) ensureCheckIfNodeMatches(tx *txn, idx uint64, preserveIndexes bo
// ensureRegistrationTxn is used to make sure a node, service, and check // ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race // registration is performed within a single transaction to avoid race
// conditions on state updates. // conditions on state updates.
func (s *Store) ensureRegistrationTxn(tx *txn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error { func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes bool, req *structs.RegisterRequest) error {
if _, err := validateRegisterRequestTxn(tx, req); err != nil { if _, err := validateRegisterRequestTxn(tx, req); err != nil {
return err return err
} }
@ -378,7 +376,7 @@ func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
// ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name // ensureNoNodeWithSimilarNameTxn checks that no other node has conflict in its name
// If allowClashWithoutID then, getting a conflict on another node without ID will be allowed // If allowClashWithoutID then, getting a conflict on another node without ID will be allowed
func ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWithoutID bool) error { func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWithoutID bool) error {
// Retrieve all of the nodes // Retrieve all of the nodes
enodes, err := tx.Get("nodes", "id") enodes, err := tx.Get("nodes", "id")
if err != nil { if err != nil {
@ -414,7 +412,7 @@ func ensureNoNodeWithSimilarNameTxn(tx *txn, node *structs.Node, allowClashWitho
// ensureNodeCASTxn updates a node only if the existing index matches the given index. // ensureNodeCASTxn updates a node only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error. // Returns a bool indicating if a write happened and any error.
func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool, error) { func (s *Store) ensureNodeCASTxn(tx WriteTxn, idx uint64, node *structs.Node) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
existing, err := getNodeTxn(tx, node.Node) existing, err := getNodeTxn(tx, node.Node)
if err != nil { if err != nil {
@ -444,7 +442,7 @@ func (s *Store) ensureNodeCASTxn(tx *txn, idx uint64, node *structs.Node) (bool,
// ensureNodeTxn is the inner function called to actually create a node // ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows // registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn. // passing in a memdb transaction so it may be part of a larger txn.
func (s *Store) ensureNodeTxn(tx *txn, idx uint64, preserveIndexes bool, node *structs.Node) error { func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, node *structs.Node) error {
// See if there's an existing node with this UUID, and make sure the // See if there's an existing node with this UUID, and make sure the
// name is the same. // name is the same.
var n *structs.Node var n *structs.Node
@ -546,7 +544,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
return idx, node, nil return idx, node, nil
} }
func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) { func getNodeTxn(tx ReadTxn, nodeName string) (*structs.Node, error) {
node, err := tx.First("nodes", "id", nodeName) node, err := tx.First("nodes", "id", nodeName)
if err != nil { if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err) return nil, fmt.Errorf("node lookup failed: %s", err)
@ -557,7 +555,7 @@ func getNodeTxn(tx *txn, nodeName string) (*structs.Node, error) {
return nil, nil return nil, nil
} }
func getNodeIDTxn(tx *txn, id types.NodeID) (*structs.Node, error) { func getNodeIDTxn(tx ReadTxn, id types.NodeID) (*structs.Node, error) {
strnode := string(id) strnode := string(id)
uuidValue, err := uuid.ParseUUID(strnode) uuidValue, err := uuid.ParseUUID(strnode)
if err != nil { if err != nil {
@ -657,7 +655,7 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error {
// deleteNodeCASTxn is used to try doing a node delete operation with a given // deleteNodeCASTxn is used to try doing a node delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked. // the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bool, error) { func (s *Store) deleteNodeCASTxn(tx WriteTxn, idx, cidx uint64, nodeName string) (bool, error) {
// Look up the node. // Look up the node.
node, err := getNodeTxn(tx, nodeName) node, err := getNodeTxn(tx, nodeName)
if err != nil { if err != nil {
@ -684,7 +682,7 @@ func (s *Store) deleteNodeCASTxn(tx *txn, idx, cidx uint64, nodeName string) (bo
// deleteNodeTxn is the inner method used for removing a node from // deleteNodeTxn is the inner method used for removing a node from
// the store within a given transaction. // the store within a given transaction.
func (s *Store) deleteNodeTxn(tx *txn, idx uint64, nodeName string) error { func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
// Look up the node. // Look up the node.
node, err := tx.First("nodes", "id", nodeName) node, err := tx.First("nodes", "id", nodeName)
if err != nil { if err != nil {
@ -791,7 +789,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
// ensureServiceCASTxn updates a service only if the existing index matches the given index. // ensureServiceCASTxn updates a service only if the existing index matches the given index.
// Returns an error if the write didn't happen and nil if write was successful. // Returns an error if the write didn't happen and nil if write was successful.
func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeService) error { func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
// Retrieve the existing service. // Retrieve the existing service.
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil { if err != nil {
@ -816,7 +814,7 @@ func ensureServiceCASTxn(tx *txn, idx uint64, node string, svc *structs.NodeServ
// ensureServiceTxn is used to upsert a service registration within an // ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction. // existing memdb transaction.
func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
// Check for existing service // Check for existing service
_, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID) _, existing, err := firstWatchCompoundWithTxn(tx, "services", "id", &svc.EnterpriseMeta, node, svc.ID)
if err != nil { if err != nil {
@ -921,7 +919,7 @@ func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta)
return serviceListTxn(tx, ws, entMeta) return serviceListTxn(tx, ws, entMeta)
} }
func serviceListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) { func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
idx := catalogServicesMaxIndex(tx, entMeta) idx := catalogServicesMaxIndex(tx, entMeta)
services, err := catalogServiceList(tx, entMeta, true) services, err := catalogServiceList(tx, entMeta, true)
@ -1025,7 +1023,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
// * return when the last instance of a service is removed // * return when the last instance of a service is removed
// * block until an instance for this service is available, or another // * block until an instance for this service is available, or another
// service is unregistered. // service is unregistered.
func maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 { func maxIndexForService(tx ReadTxn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) uint64 {
idx, _ := maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks, entMeta) idx, _ := maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks, entMeta)
return idx return idx
} }
@ -1044,7 +1042,7 @@ func maxIndexForService(tx *txn, serviceName string, serviceExists, checks bool,
// returned for the chan. This allows for blocking watchers to _only_ watch this // returned for the chan. This allows for blocking watchers to _only_ watch this
// one chan in the common case, falling back to watching all touched MemDB // one chan in the common case, falling back to watching all touched MemDB
// indexes in more complicated cases. // indexes in more complicated cases.
func maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) { func maxIndexAndWatchChForService(tx ReadTxn, serviceName string, serviceExists, checks bool, entMeta *structs.EnterpriseMeta) (uint64, <-chan struct{}) {
if !serviceExists { if !serviceExists {
res, err := catalogServiceLastExtinctionIndex(tx, entMeta) res, err := catalogServiceLastExtinctionIndex(tx, entMeta)
if missingIdx, ok := res.(*IndexEntry); ok && err == nil { if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
@ -1061,7 +1059,7 @@ func maxIndexAndWatchChForService(tx *txn, serviceName string, serviceExists, ch
} }
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes // Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
func maxIndexAndWatchChsForServiceNodes(tx *txn, func maxIndexAndWatchChsForServiceNodes(tx ReadTxn,
nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) { nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) {
var watchChans []<-chan struct{} var watchChans []<-chan struct{}
@ -1268,7 +1266,7 @@ func (s *Store) ServiceAddressNodes(ws memdb.WatchSet, address string, entMeta *
// parseServiceNodes iterates over a services query and fills in the node details, // parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice. // returning a ServiceNodes slice.
func parseServiceNodes(tx *txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { func parseServiceNodes(tx ReadTxn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
// We don't want to track an unlimited number of nodes, so we pull a // We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback. // top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id") allNodes, err := tx.Get("nodes", "id")
@ -1325,7 +1323,7 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
return idx, service, nil return idx, service, nil
} }
func getNodeServiceTxn(tx *txn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
// Query the service // Query the service
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil { if err != nil {
@ -1467,7 +1465,7 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string, entMeta *s
// deleteServiceCASTxn is used to try doing a service delete operation with a given // deleteServiceCASTxn is used to try doing a service delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given service, then the call is a noop, otherwise a normal delete is invoked. // the given service, then the call is a noop, otherwise a normal delete is invoked.
func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) { func (s *Store) deleteServiceCASTxn(tx WriteTxn, idx, cidx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Look up the service. // Look up the service.
service, err := getNodeServiceTxn(tx, nodeName, serviceID, entMeta) service, err := getNodeServiceTxn(tx, nodeName, serviceID, entMeta)
if err != nil { if err != nil {
@ -1494,7 +1492,7 @@ func (s *Store) deleteServiceCASTxn(tx *txn, idx, cidx uint64, nodeName, service
// deleteServiceTxn is the inner method called to remove a service // deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction. // registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) error {
// Look up the service. // Look up the service.
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil { if err != nil {
@ -1589,7 +1587,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
} }
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node // updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
func updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error { func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error {
services, err := tx.Get("services", "node", nodeID) services, err := tx.Get("services", "node", nodeID)
if err != nil { if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err) return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
@ -1608,7 +1606,7 @@ func updateAllServiceIndexesOfNode(tx *txn, idx uint64, nodeID string) error {
// ensureCheckCASTxn updates a check only if the existing index matches the given index. // ensureCheckCASTxn updates a check only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error. // Returns a bool indicating if a write happened and any error.
func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck) (bool, error) { func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthCheck) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
_, existing, err := getNodeCheckTxn(tx, hc.Node, hc.CheckID, &hc.EnterpriseMeta) _, existing, err := getNodeCheckTxn(tx, hc.Node, hc.CheckID, &hc.EnterpriseMeta)
if err != nil { if err != nil {
@ -1638,7 +1636,7 @@ func (s *Store) ensureCheckCASTxn(tx *txn, idx uint64, hc *structs.HealthCheck)
// ensureCheckTxn is used as the inner method to handle inserting // ensureCheckTxn is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting // a health check into the state store. It ensures safety against inserting
// checks with no matching node or service. // checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx *txn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
// Check if we have an existing health check // Check if we have an existing health check
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil { if err != nil {
@ -1743,7 +1741,7 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID, entMeta *struc
// nodeCheckTxn is used as the inner method to handle reading a health check // nodeCheckTxn is used as the inner method to handle reading a health check
// from the state store. // from the state store.
func getNodeCheckTxn(tx *txn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) { func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (uint64, *structs.HealthCheck, error) {
// Get the table index. // Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta) idx := catalogChecksMaxIndex(tx, entMeta)
@ -1859,7 +1857,7 @@ func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters
return parseChecksByNodeMeta(tx, ws, idx, iter, filters) return parseChecksByNodeMeta(tx, ws, idx, iter, filters)
} }
func checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) { func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *structs.EnterpriseMeta) (uint64, memdb.ResultIterator, error) {
// Get the table index. // Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta) idx := catalogChecksMaxIndex(tx, entMeta)
@ -1881,7 +1879,7 @@ func checksInStateTxn(tx *txn, ws memdb.WatchSet, state string, entMeta *structs
// parseChecksByNodeMeta is a helper function used to deduplicate some // parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields. // repetitive code for returning health checks filtered by node metadata fields.
func parseChecksByNodeMeta(tx *txn, ws memdb.WatchSet, func parseChecksByNodeMeta(tx ReadTxn, ws memdb.WatchSet,
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) { idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
// We don't want to track an unlimited number of nodes, so we pull a // We don't want to track an unlimited number of nodes, so we pull a
@ -1930,7 +1928,7 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID, entM
// deleteCheckCASTxn is used to try doing a check delete operation with a given // deleteCheckCASTxn is used to try doing a check delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked. // the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) { func (s *Store) deleteCheckCASTxn(tx WriteTxn, idx, cidx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) (bool, error) {
// Try to retrieve the existing health check. // Try to retrieve the existing health check.
_, hc, err := getNodeCheckTxn(tx, node, checkID, entMeta) _, hc, err := getNodeCheckTxn(tx, node, checkID, entMeta)
if err != nil { if err != nil {
@ -1957,7 +1955,7 @@ func (s *Store) deleteCheckCASTxn(tx *txn, idx, cidx uint64, node string, checkI
// deleteCheckTxn is the inner method used to call a health // deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction. // check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
// Try to retrieve the existing health check. // Try to retrieve the existing health check.
_, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID)) _, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID))
if err != nil { if err != nil {
@ -2101,7 +2099,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
return checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta) return checkServiceNodesTxn(tx, ws, serviceName, connect, entMeta)
} }
func checkServiceNodesTxn(tx *txn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Function for lookup // Function for lookup
index := "service" index := "service"
if connect { if connect {
@ -2279,7 +2277,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
// and query for an associated node and a set of checks. This is the inner // and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query. // method used to return a rich set of results from a more simple query.
func parseCheckServiceNodes( func parseCheckServiceNodes(
tx *txn, ws memdb.WatchSet, idx uint64, tx ReadTxn, ws memdb.WatchSet, idx uint64,
services structs.ServiceNodes, services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) { err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil { if err != nil {
@ -2404,7 +2402,7 @@ func (s *Store) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind
} }
} }
func serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { func serviceDumpAllTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// Get the table index // Get the table index
idx := catalogMaxIndexWatch(tx, ws, entMeta, true) idx := catalogMaxIndexWatch(tx, ws, entMeta, true)
@ -2422,7 +2420,7 @@ func serviceDumpAllTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMe
return parseCheckServiceNodes(tx, nil, idx, results, err) return parseCheckServiceNodes(tx, nil, idx, results, err)
} }
func serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
// unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks) // unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks)
// updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual // updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual
// entries // entries
@ -2446,7 +2444,7 @@ func serviceDumpKindTxn(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, en
// parseNodes takes an iterator over a set of nodes and returns a struct // parseNodes takes an iterator over a set of nodes and returns a struct
// containing the nodes along with all of their associated services // containing the nodes along with all of their associated services
// and/or health checks. // and/or health checks.
func parseNodes(tx *txn, ws memdb.WatchSet, idx uint64, func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) { iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) {
// We don't want to track an unlimited number of services, so we pull a // We don't want to track an unlimited number of services, so we pull a
@ -2506,7 +2504,7 @@ func parseNodes(tx *txn, ws memdb.WatchSet, idx uint64,
} }
// checkSessionsTxn returns the IDs of all sessions associated with a health check // checkSessionsTxn returns the IDs of all sessions associated with a health check
func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error) { func checkSessionsTxn(tx ReadTxn, hc *structs.HealthCheck) ([]*sessionCheck, error) {
mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) mappings, err := getCompoundWithTxn(tx, "session_checks", "node_check", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed session checks lookup: %s", err) return nil, fmt.Errorf("failed session checks lookup: %s", err)
@ -2520,7 +2518,7 @@ func checkSessionsTxn(tx *txn, hc *structs.HealthCheck) ([]*sessionCheck, error)
} }
// updateGatewayServices associates services with gateways as specified in a gateway config entry // updateGatewayServices associates services with gateways as specified in a gateway config entry
func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error { func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
var ( var (
noChange bool noChange bool
gatewayServices structs.GatewayServices gatewayServices structs.GatewayServices
@ -2582,7 +2580,7 @@ func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMet
// insertion into the memdb table, specific to ingress gateways. The boolean // insertion into the memdb table, specific to ingress gateways. The boolean
// returned indicates that there are no changes necessary to the memdb table. // returned indicates that there are no changes necessary to the memdb table.
func ingressConfigGatewayServices( func ingressConfigGatewayServices(
tx *txn, tx ReadTxn,
gateway structs.ServiceName, gateway structs.ServiceName,
conf structs.ConfigEntry, conf structs.ConfigEntry,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
@ -2627,7 +2625,7 @@ func ingressConfigGatewayServices(
// boolean returned indicates that there are no changes necessary to the memdb // boolean returned indicates that there are no changes necessary to the memdb
// table. // table.
func terminatingConfigGatewayServices( func terminatingConfigGatewayServices(
tx *txn, tx ReadTxn,
gateway structs.ServiceName, gateway structs.ServiceName,
conf structs.ConfigEntry, conf structs.ConfigEntry,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
@ -2667,7 +2665,7 @@ func terminatingConfigGatewayServices(
} }
// updateGatewayNamespace is used to target all services within a namespace // updateGatewayNamespace is used to target all services within a namespace
func updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error { func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error {
services, err := catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta) services, err := catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta)
if err != nil { if err != nil {
return fmt.Errorf("failed querying services: %s", err) return fmt.Errorf("failed querying services: %s", err)
@ -2714,7 +2712,7 @@ func updateGatewayNamespace(tx *txn, idx uint64, service *structs.GatewayService
// updateGatewayService associates services with gateways after an eligible event // updateGatewayService associates services with gateways after an eligible event
// ie. Registering a service in a namespace targeted by a gateway // ie. Registering a service in a namespace targeted by a gateway
func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService) error { func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
// Check if mapping already exists in table if it's already in the table // Check if mapping already exists in table if it's already in the table
// Avoid insert if nothing changed // Avoid insert if nothing changed
existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port) existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port)
@ -2749,7 +2747,7 @@ func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService)
// checkWildcardForGatewaysAndUpdate checks whether a service matches a // checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the // wildcard definition in gateway config entries and if so adds it the the
// gateway-services table. // gateway-services table.
func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeService) error { func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeService) error {
// Do not associate non-typical services with gateways or consul services // Do not associate non-typical services with gateways or consul services
if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" { if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" {
return nil return nil
@ -2776,7 +2774,7 @@ func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeServic
return nil return nil
} }
func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) error { func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error {
// Clean up association between service name and gateways if needed // Clean up association between service name and gateways if needed
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta) gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil { if err != nil {
@ -2805,11 +2803,11 @@ func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) erro
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up // serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service. // all the gateways mapped to this service.
func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta)) return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta))
} }
func gatewayServices(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)) return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta))
} }
@ -2832,7 +2830,7 @@ func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayS
return lib.MaxUint64(maxIdx, idx), results, nil return lib.MaxUint64(maxIdx, idx), results, nil
} }
func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) { func (s *Store) collectGatewayServices(tx ReadTxn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) {
var maxIdx uint64 var maxIdx uint64
var results structs.GatewayServices var results structs.GatewayServices
@ -2858,7 +2856,7 @@ func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.Re
// TODO(ingress): How to handle index rolling back when a config entry is // TODO(ingress): How to handle index rolling back when a config entry is
// deleted that references a service? // deleted that references a service?
// We might need something like the service_last_extinction index? // We might need something like the service_last_extinction index?
func serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
// Look up gateway name associated with the service // Look up gateway name associated with the service
gws, err := serviceGateways(tx, service, entMeta) gws, err := serviceGateways(tx, service, entMeta)
if err != nil { if err != nil {
@ -3060,7 +3058,7 @@ func (s *Store) ServiceTopology(
// combinedServiceNodesTxn returns typical and connect endpoints for a list of services. // combinedServiceNodesTxn returns typical and connect endpoints for a list of services.
// This enabled aggregating checks statuses across both. // This enabled aggregating checks statuses across both.
func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) { func (s *Store) combinedServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
var ( var (
maxIdx uint64 maxIdx uint64
resp structs.CheckServiceNodes resp structs.CheckServiceNodes
@ -3177,7 +3175,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
} }
// updateMeshTopology creates associations between the input service and its upstreams in the topology table // updateMeshTopology creates associations between the input service and its upstreams in the topology table
func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error { func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error {
oldUpstreams := make(map[structs.ServiceName]bool) oldUpstreams := make(map[structs.ServiceName]bool)
if e, ok := existing.(*structs.ServiceNode); ok { if e, ok := existing.(*structs.ServiceNode); ok {
for _, u := range e.ServiceProxy.Upstreams { for _, u := range e.ServiceProxy.Upstreams {
@ -3257,7 +3255,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi
// cleanupMeshTopology removes a service from the mesh topology table // cleanupMeshTopology removes a service from the mesh topology table
// This is only safe to call when there are no more known instances of this proxy // This is only safe to call when there are no more known instances of this proxy
func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) error { func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) error {
if service.ServiceKind != structs.ServiceKindConnectProxy { if service.ServiceKind != structs.ServiceKindConnectProxy {
return nil return nil
} }
@ -3294,7 +3292,7 @@ func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) erro
return nil return nil
} }
func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error { func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.GatewayService) error {
// Only ingress gateways are standalone items in the mesh topology viz // Only ingress gateways are standalone items in the mesh topology viz
if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier { if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier {
return nil return nil
@ -3315,7 +3313,7 @@ func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.Gatewa
return nil return nil
} }
func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error { func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.GatewayService) error {
// Only ingress gateways are standalone items in the mesh topology viz // Only ingress gateways are standalone items in the mesh topology viz
if gs.GatewayKind != structs.ServiceKindIngressGateway { if gs.GatewayKind != structs.ServiceKindIngressGateway {
return nil return nil
@ -3331,7 +3329,7 @@ func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.Gatewa
return nil return nil
} }
func truncateGatewayServiceTopologyMappings(tx *txn, idx uint64, gateway structs.ServiceName, kind string) error { func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway structs.ServiceName, kind string) error {
// Only ingress gateways are standalone items in the mesh topology viz // Only ingress gateways are standalone items in the mesh topology viz
if kind != string(structs.ServiceKindIngressGateway) { if kind != string(structs.ServiceKindIngressGateway) {
return nil return nil

View File

@ -168,7 +168,7 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s
} }
} }
func catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// overall services index // overall services index
if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil { if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -177,7 +177,7 @@ func catalogUpdateServicesIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta
return nil return nil
} }
func catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateServiceKindIndexes(tx WriteTxn, kind structs.ServiceKind, idx uint64, _ *structs.EnterpriseMeta) error {
// service-kind index // service-kind index
if err := indexUpdateMaxTxn(tx, idx, serviceKindIndexName(kind, nil)); err != nil { if err := indexUpdateMaxTxn(tx, idx, serviceKindIndexName(kind, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -186,7 +186,7 @@ func catalogUpdateServiceKindIndexes(tx *txn, kind structs.ServiceKind, idx uint
return nil return nil
} }
func catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _ *structs.EnterpriseMeta) error {
// per-service index // per-service index
if err := indexUpdateMaxTxn(tx, idx, serviceIndexName(serviceName, nil)); err != nil { if err := indexUpdateMaxTxn(tx, idx, serviceIndexName(serviceName, nil)); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -195,14 +195,14 @@ func catalogUpdateServiceIndexes(tx *txn, serviceName string, idx uint64, _ *str
return nil return nil
} }
func catalogUpdateServiceExtinctionIndex(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil { if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %s", err) return fmt.Errorf("failed updating missing service extinction index: %s", err)
} }
return nil return nil
} }
func catalogInsertService(tx *txn, svc *structs.ServiceNode) error { func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
// Insert the service and update the index // Insert the service and update the index
if err := tx.Insert("services", svc); err != nil { if err := tx.Insert("services", svc); err != nil {
return fmt.Errorf("failed inserting service: %s", err) return fmt.Errorf("failed inserting service: %s", err)
@ -269,7 +269,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe
return maxIndexWatchTxn(tx, ws, "nodes", "services") return maxIndexWatchTxn(tx, ws, "nodes", "services")
} }
func catalogUpdateCheckIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// update the universal index entry // update the universal index entry
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
@ -306,7 +306,7 @@ func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *struct
return tx.Get("checks", "node_service", node, service) return tx.Get("checks", "node_service", node, service)
} }
func catalogInsertCheck(tx *txn, chk *structs.HealthCheck, idx uint64) error { func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
// Insert the check // Insert the check
if err := tx.Insert("checks", chk); err != nil { if err := tx.Insert("checks", chk); err != nil {
return fmt.Errorf("failed inserting check: %s", err) return fmt.Errorf("failed inserting check: %s", err)
@ -323,7 +323,7 @@ func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMet
return tx.Get("checks", "node_service", node, service) return tx.Get("checks", "node_service", node, service)
} }
func validateRegisterRequestTxn(_ *txn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) { func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
return nil, nil return nil, nil
} }

View File

@ -4415,12 +4415,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, tx.Commit()) require.NoError(t, tx.Commit())
// ensure no update happened // ensure no update happened
tx = s.db.Txn(false) roTxn := s.db.Txn(false)
_, nsRead, err := s.NodeService("node1", "foo", nil) _, nsRead, err := s.NodeService("node1", "foo", nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, nsRead) require.NotNil(t, nsRead)
require.Equal(t, uint64(2), nsRead.ModifyIndex) require.Equal(t, uint64(2), nsRead.ModifyIndex)
require.NoError(t, tx.Commit()) roTxn.Commit()
ns.ModifyIndex = 99 ns.ModifyIndex = 99
// attempt to update with a non-matching index // attempt to update with a non-matching index
@ -4430,12 +4430,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, tx.Commit()) require.NoError(t, tx.Commit())
// ensure no update happened // ensure no update happened
tx = s.db.Txn(false) roTxn = s.db.Txn(false)
_, nsRead, err = s.NodeService("node1", "foo", nil) _, nsRead, err = s.NodeService("node1", "foo", nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, nsRead) require.NotNil(t, nsRead)
require.Equal(t, uint64(2), nsRead.ModifyIndex) require.Equal(t, uint64(2), nsRead.ModifyIndex)
require.NoError(t, tx.Commit()) roTxn.Commit()
ns.ModifyIndex = 2 ns.ModifyIndex = 2
// update with the matching modify index // update with the matching modify index
@ -4445,12 +4445,12 @@ func TestStateStore_ensureServiceCASTxn(t *testing.T) {
require.NoError(t, tx.Commit()) require.NoError(t, tx.Commit())
// ensure the update happened // ensure the update happened
tx = s.db.Txn(false) roTxn = s.db.Txn(false)
_, nsRead, err = s.NodeService("node1", "foo", nil) _, nsRead, err = s.NodeService("node1", "foo", nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, nsRead) require.NotNil(t, nsRead)
require.Equal(t, uint64(7), nsRead.ModifyIndex) require.Equal(t, uint64(7), nsRead.ModifyIndex)
require.NoError(t, tx.Commit()) roTxn.Commit()
} }
func TestStateStore_GatewayServices_Terminating(t *testing.T) { func TestStateStore_GatewayServices_Terminating(t *testing.T) {

View File

@ -1,12 +1,12 @@
package state package state
import ( import (
"encoding/json"
"fmt" "fmt"
"sort" "sort"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
) )
type ServiceIntentionLegacyIDIndex struct { type ServiceIntentionLegacyIDIndex struct {
@ -123,7 +123,7 @@ func (s *ServiceIntentionSourceIndex) FromArgs(args ...interface{}) ([]byte, err
return []byte(arg.String() + "\x00"), nil return []byte(arg.String() + "\x00"), nil
} }
func (s *Store) configIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { func (s *Store) configIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) {
// unrolled part of configEntriesByKindTxn // unrolled part of configEntriesByKindTxn
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
@ -144,7 +144,7 @@ func (s *Store) configIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *str
return idx, results, true, nil return idx, results, true, nil
} }
func (s *Store) configIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) { func (s *Store) configIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) {
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
if idx < 1 { if idx < 1 {
idx = 1 idx = 1
@ -173,7 +173,7 @@ func (s *Store) configIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (ui
return idx, nil, nil, nil // Shouldn't happen. return idx, nil, nil, nil // Shouldn't happen.
} }
func (s *Store) configIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) { func (s *Store) configIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.ServiceIntentionsConfigEntry, *structs.Intention, error) {
if err := args.Validate(); err != nil { if err := args.Validate(); err != nil {
return 0, nil, nil, err return 0, nil, nil, err
} }
@ -196,7 +196,7 @@ func (s *Store) configIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *str
return idx, nil, nil, nil return idx, nil, nil, nil
} }
func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { func (s *Store) configIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) {
maxIndex := uint64(1) maxIndex := uint64(1)
// Make all the calls and accumulate the results // Make all the calls and accumulate the results
@ -207,7 +207,7 @@ func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct
// improving that in the future, the test cases shouldn't have to // improving that in the future, the test cases shouldn't have to
// change for that. // change for that.
index, ixns, err := s.configIntentionMatchOneTxn(tx, ws, entry, args.Type) index, ixns, err := configIntentionMatchOneTxn(tx, ws, entry, args.Type)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -222,23 +222,23 @@ func (s *Store) configIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct
return maxIndex, results, nil return maxIndex, results, nil
} }
func (s *Store) configIntentionMatchOneTxn( func configIntentionMatchOneTxn(
tx *txn, tx ReadTxn,
ws memdb.WatchSet, ws memdb.WatchSet,
matchEntry structs.IntentionMatchEntry, matchEntry structs.IntentionMatchEntry,
matchType structs.IntentionMatchType, matchType structs.IntentionMatchType,
) (uint64, structs.Intentions, error) { ) (uint64, structs.Intentions, error) {
switch matchType { switch matchType {
case structs.IntentionMatchSource: case structs.IntentionMatchSource:
return s.readSourceIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta()) return readSourceIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta())
case structs.IntentionMatchDestination: case structs.IntentionMatchDestination:
return s.readDestinationIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta()) return readDestinationIntentionsFromConfigEntriesTxn(tx, ws, matchEntry.Name, matchEntry.GetEnterpriseMeta())
default: default:
return 0, nil, fmt.Errorf("invalid intention match type: %s", matchType) return 0, nil, fmt.Errorf("invalid intention match type: %s", matchType)
} }
} }
func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) { func readSourceIntentionsFromConfigEntriesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
var ( var (
@ -248,7 +248,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.Watch
names := getIntentionPrecedenceMatchServiceNames(serviceName, entMeta) names := getIntentionPrecedenceMatchServiceNames(serviceName, entMeta)
for _, sn := range names { for _, sn := range names {
results, err = s.readSourceIntentionsFromConfigEntriesForServiceTxn( results, err = readSourceIntentionsFromConfigEntriesForServiceTxn(
tx, ws, sn.Name, &sn.EnterpriseMeta, results, tx, ws, sn.Name, &sn.EnterpriseMeta, results,
) )
if err != nil { if err != nil {
@ -262,7 +262,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.Watch
return idx, results, nil return idx, results, nil
} }
func (s *Store) readSourceIntentionsFromConfigEntriesForServiceTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta, results structs.Intentions) (structs.Intentions, error) { func readSourceIntentionsFromConfigEntriesForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta, results structs.Intentions) (structs.Intentions, error) {
sn := structs.NewServiceName(serviceName, entMeta) sn := structs.NewServiceName(serviceName, entMeta)
iter, err := tx.Get(configTableName, "intention-source", sn) iter, err := tx.Get(configTableName, "intention-source", sn)
@ -283,12 +283,7 @@ func (s *Store) readSourceIntentionsFromConfigEntriesForServiceTxn(tx *txn, ws m
return results, nil return results, nil
} }
func jd(v interface{}) string { func readDestinationIntentionsFromConfigEntriesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
d, _ := json.MarshalIndent(v, "", " ")
return string(d)
}
func (s *Store) readDestinationIntentionsFromConfigEntriesTxn(tx *txn, ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, error) {
idx := maxIndexTxn(tx, configTableName) idx := maxIndexTxn(tx, configTableName)
var results structs.Intentions var results structs.Intentions

View File

@ -116,7 +116,7 @@ func (s *Store) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, e
return caConfigTxn(tx, ws) return caConfigTxn(tx, ws)
} }
func caConfigTxn(tx *txn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { func caConfigTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) {
// Get the CA config // Get the CA config
ch, c, err := tx.FirstWatch(caConfigTableName, "id") ch, c, err := tx.FirstWatch(caConfigTableName, "id")
if err != nil { if err != nil {
@ -236,7 +236,7 @@ func (s *Store) CARoots(ws memdb.WatchSet) (uint64, structs.CARoots, error) {
return caRootsTxn(tx, ws) return caRootsTxn(tx, ws)
} }
func caRootsTxn(tx *txn, ws memdb.WatchSet) (uint64, structs.CARoots, error) { func caRootsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, structs.CARoots, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, caRootTableName) idx := maxIndexTxn(tx, caRootTableName)

View File

@ -134,7 +134,7 @@ func (s *Store) FederationStateGet(ws memdb.WatchSet, datacenter string) (uint64
return federationStateGetTxn(tx, ws, datacenter) return federationStateGetTxn(tx, ws, datacenter)
} }
func federationStateGetTxn(tx *txn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) { func federationStateGetTxn(tx ReadTxn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, federationStateTableName) idx := maxIndexTxn(tx, federationStateTableName)
@ -164,7 +164,7 @@ func (s *Store) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.Feder
return federationStateListTxn(tx, ws) return federationStateListTxn(tx, ws)
} }
func federationStateListTxn(tx *txn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) { func federationStateListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, federationStateTableName) idx := maxIndexTxn(tx, federationStateTableName)

View File

@ -28,7 +28,7 @@ func NewGraveyard(gc *TombstoneGC) *Graveyard {
} }
// InsertTxn adds a new tombstone. // InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error { func (g *Graveyard) InsertTxn(tx WriteTxn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error {
stone := &Tombstone{ stone := &Tombstone{
Key: key, Key: key,
Index: idx, Index: idx,
@ -51,7 +51,7 @@ func (g *Graveyard) InsertTxn(tx *txn, key string, idx uint64, entMeta *structs.
// GetMaxIndexTxn returns the highest index tombstone whose key matches the // GetMaxIndexTxn returns the highest index tombstone whose key matches the
// given context, using a prefix match. // given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) { func (g *Graveyard) GetMaxIndexTxn(tx ReadTxn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) {
stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta) stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta)
if err != nil { if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err) return 0, fmt.Errorf("failed querying tombstones: %s", err)
@ -68,7 +68,7 @@ func (g *Graveyard) GetMaxIndexTxn(tx *txn, prefix string, entMeta *structs.Ente
} }
// DumpTxn returns all the tombstones. // DumpTxn returns all the tombstones.
func (g *Graveyard) DumpTxn(tx *txn) (memdb.ResultIterator, error) { func (g *Graveyard) DumpTxn(tx ReadTxn) (memdb.ResultIterator, error) {
iter, err := tx.Get("tombstones", "id") iter, err := tx.Get("tombstones", "id")
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -6,9 +6,7 @@ import (
"fmt" "fmt"
) )
func (g *Graveyard) insertTombstoneWithTxn(tx *txn, func (g *Graveyard) insertTombstoneWithTxn(tx WriteTxn, _ string, stone *Tombstone, updateMax bool) error {
table string, stone *Tombstone, updateMax bool) error {
if err := tx.Insert("tombstones", stone); err != nil { if err := tx.Insert("tombstones", stone); err != nil {
return err return err
} }

View File

@ -5,10 +5,11 @@ import (
"fmt" "fmt"
"sort" "sort"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
) )
const ( const (
@ -142,7 +143,7 @@ func (s *Store) AreIntentionsInConfigEntries() (bool, error) {
return areIntentionsInConfigEntries(tx) return areIntentionsInConfigEntries(tx)
} }
func areIntentionsInConfigEntries(tx *txn) (bool, error) { func areIntentionsInConfigEntries(tx ReadTxn) (bool, error) {
_, entry, err := systemMetadataGetTxn(tx, nil, structs.SystemMetadataIntentionFormatKey) _, entry, err := systemMetadataGetTxn(tx, nil, structs.SystemMetadataIntentionFormatKey)
if err != nil { if err != nil {
return false, fmt.Errorf("failed system metadatalookup: %s", err) return false, fmt.Errorf("failed system metadatalookup: %s", err)
@ -178,7 +179,7 @@ func (s *Store) Intentions(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (
return s.configIntentionsListTxn(tx, ws, entMeta) return s.configIntentionsListTxn(tx, ws, entMeta)
} }
func (s *Store) legacyIntentionsListTxn(tx *txn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { func (s *Store) legacyIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) {
// Get the index // Get the index
idx := maxIndexTxn(tx, intentionsTableName) idx := maxIndexTxn(tx, intentionsTableName)
if idx < 1 { if idx < 1 {
@ -230,7 +231,7 @@ func (s *Store) LegacyIntentionSet(idx uint64, ixn *structs.Intention) error {
// legacyIntentionSetTxn is the inner method used to insert an intention with // legacyIntentionSetTxn is the inner method used to insert an intention with
// the proper indexes into the state store. // the proper indexes into the state store.
func legacyIntentionSetTxn(tx *txn, idx uint64, ixn *structs.Intention) error { func legacyIntentionSetTxn(tx WriteTxn, idx uint64, ixn *structs.Intention) error {
// ID is required // ID is required
if ixn.ID == "" { if ixn.ID == "" {
return ErrMissingIntentionID return ErrMissingIntentionID
@ -301,7 +302,7 @@ func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Ser
return s.configIntentionGetTxn(tx, ws, id) return s.configIntentionGetTxn(tx, ws, id)
} }
func (s *Store) legacyIntentionGetTxn(tx *txn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) { func (s *Store) legacyIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) {
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, intentionsTableName) idx := maxIndexTxn(tx, intentionsTableName)
if idx < 1 { if idx < 1 {
@ -340,7 +341,7 @@ func (s *Store) IntentionGetExact(ws memdb.WatchSet, args *structs.IntentionQuer
return s.configIntentionGetExactTxn(tx, ws, args) return s.configIntentionGetExactTxn(tx, ws, args)
} }
func (s *Store) legacyIntentionGetExactTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.Intention, error) { func (s *Store) legacyIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryExact) (uint64, *structs.Intention, error) {
if err := args.Validate(); err != nil { if err := args.Validate(); err != nil {
return 0, nil, err return 0, nil, err
} }
@ -392,7 +393,7 @@ func (s *Store) LegacyIntentionDelete(idx uint64, id string) error {
// legacyIntentionDeleteTxn is the inner method used to delete a legacy intention // legacyIntentionDeleteTxn is the inner method used to delete a legacy intention
// with the proper indexes into the state store. // with the proper indexes into the state store.
func legacyIntentionDeleteTxn(tx *txn, idx uint64, queryID string) error { func legacyIntentionDeleteTxn(tx WriteTxn, idx uint64, queryID string) error {
// Pull the query. // Pull the query.
wrapped, err := tx.First(intentionsTableName, "id", queryID) wrapped, err := tx.First(intentionsTableName, "id", queryID)
if err != nil { if err != nil {
@ -531,7 +532,7 @@ func (s *Store) IntentionMatch(ws memdb.WatchSet, args *structs.IntentionQueryMa
return s.configIntentionMatchTxn(tx, ws, args) return s.configIntentionMatchTxn(tx, ws, args)
} }
func (s *Store) legacyIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { func (s *Store) legacyIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) {
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, intentionsTableName) idx := maxIndexTxn(tx, intentionsTableName)
if idx < 1 { if idx < 1 {
@ -541,7 +542,7 @@ func (s *Store) legacyIntentionMatchTxn(tx *txn, ws memdb.WatchSet, args *struct
// Make all the calls and accumulate the results // Make all the calls and accumulate the results
results := make([]structs.Intentions, len(args.Entries)) results := make([]structs.Intentions, len(args.Entries))
for i, entry := range args.Entries { for i, entry := range args.Entries {
ixns, err := s.intentionMatchOneTxn(tx, ws, entry, args.Type) ixns, err := intentionMatchOneTxn(tx, ws, entry, args.Type)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -575,13 +576,13 @@ func (s *Store) IntentionMatchOne(
return 0, nil, err return 0, nil, err
} }
if !usingConfigEntries { if !usingConfigEntries {
return s.legacyIntentionMatchOneTxn(tx, ws, entry, matchType) return legacyIntentionMatchOneTxn(tx, ws, entry, matchType)
} }
return s.configIntentionMatchOneTxn(tx, ws, entry, matchType) return configIntentionMatchOneTxn(tx, ws, entry, matchType)
} }
func (s *Store) legacyIntentionMatchOneTxn( func legacyIntentionMatchOneTxn(
tx *txn, tx ReadTxn,
ws memdb.WatchSet, ws memdb.WatchSet,
entry structs.IntentionMatchEntry, entry structs.IntentionMatchEntry,
matchType structs.IntentionMatchType, matchType structs.IntentionMatchType,
@ -592,7 +593,7 @@ func (s *Store) legacyIntentionMatchOneTxn(
idx = 1 idx = 1
} }
results, err := s.intentionMatchOneTxn(tx, ws, entry, matchType) results, err := intentionMatchOneTxn(tx, ws, entry, matchType)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -602,7 +603,7 @@ func (s *Store) legacyIntentionMatchOneTxn(
return idx, results, nil return idx, results, nil
} }
func (s *Store) intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet, func intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet,
entry structs.IntentionMatchEntry, matchType structs.IntentionMatchType) (structs.Intentions, error) { entry structs.IntentionMatchEntry, matchType structs.IntentionMatchType) (structs.Intentions, error) {
// Each search entry may require multiple queries to memdb, so this // Each search entry may require multiple queries to memdb, so this

View File

@ -7,7 +7,7 @@ import (
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
) )
func intentionListTxn(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func intentionListTxn(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// Get all intentions // Get all intentions
return tx.Get(intentionsTableName, "id") return tx.Get(intentionsTableName, "id")
} }

View File

@ -117,7 +117,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
// If updateSession is true, then the incoming entry will set the new // If updateSession is true, then the incoming entry will set the new
// session (should be validated before calling this). Otherwise, we will keep // session (should be validated before calling this). Otherwise, we will keep
// whatever the existing session is. // whatever the existing session is.
func kvsSetTxn(tx *txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { func kvsSetTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair // Retrieve an existing KV pair
existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta) existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil { if err != nil {
@ -170,7 +170,7 @@ func (s *Store) KVSGet(ws memdb.WatchSet, key string, entMeta *structs.Enterpris
// kvsGetTxn is the inner method that gets a KVS entry inside an existing // kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction. // transaction.
func kvsGetTxn(tx *txn, func kvsGetTxn(tx ReadTxn,
ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) { ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) {
// Get the table index. // Get the table index.
@ -203,7 +203,7 @@ func (s *Store) KVSList(ws memdb.WatchSet,
// kvsListTxn is the inner method that gets a list of KVS entries matching a // kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix. // prefix.
func (s *Store) kvsListTxn(tx *txn, func (s *Store) kvsListTxn(tx ReadTxn,
ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
// Get the table indexes. // Get the table indexes.
@ -252,7 +252,7 @@ func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMet
// kvsDeleteTxn is the inner method used to perform the actual deletion // kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction. // of a key/value pair within an existing transaction.
func (s *Store) kvsDeleteTxn(tx *txn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error { func (s *Store) kvsDeleteTxn(tx WriteTxn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error {
// Look up the entry in the state store. // Look up the entry in the state store.
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil { if err != nil {
@ -289,7 +289,7 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.Ente
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing // kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction. // transaction.
func (s *Store) kvsDeleteCASTxn(tx *txn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) { func (s *Store) kvsDeleteCASTxn(tx WriteTxn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Retrieve the existing kvs entry, if any exists. // Retrieve the existing kvs entry, if any exists.
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil { if err != nil {
@ -330,7 +330,7 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsSetCASTxn is the inner method used to do a CAS inside an existing // kvsSetCASTxn is the inner method used to do a CAS inside an existing
// transaction. // transaction.
func kvsSetCASTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { func kvsSetCASTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta) existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil { if err != nil {
@ -394,7 +394,7 @@ func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsLockTxn is the inner method that does a lock inside an existing // kvsLockTxn is the inner method that does a lock inside an existing
// transaction. // transaction.
func kvsLockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { func kvsLockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present. // Verify that a session is present.
if entry.Session == "" { if entry.Session == "" {
return false, fmt.Errorf("missing session") return false, fmt.Errorf("missing session")
@ -460,7 +460,7 @@ func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsUnlockTxn is the inner method that does an unlock inside an existing // kvsUnlockTxn is the inner method that does an unlock inside an existing
// transaction. // transaction.
func kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) { func kvsUnlockTxn(tx WriteTxn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present. // Verify that a session is present.
if entry.Session == "" { if entry.Session == "" {
return false, fmt.Errorf("missing session") return false, fmt.Errorf("missing session")
@ -498,7 +498,7 @@ func kvsUnlockTxn(tx *txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// kvsCheckSessionTxn checks to see if the given session matches the current // kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key. // entry for a key.
func kvsCheckSessionTxn(tx *txn, func kvsCheckSessionTxn(tx WriteTxn,
key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) { key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
@ -519,7 +519,7 @@ func kvsCheckSessionTxn(tx *txn,
// kvsCheckIndexTxn checks to see if the given modify index matches the current // kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key. // entry for a key.
func kvsCheckIndexTxn(tx *txn, func kvsCheckIndexTxn(tx WriteTxn,
key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) { key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta) entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)

View File

@ -16,7 +16,7 @@ func kvsIndexer() *memdb.StringFieldIndex {
} }
} }
func insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error { func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool) error {
if err := tx.Insert("kvs", entry); err != nil { if err := tx.Insert("kvs", entry); err != nil {
return err return err
} }
@ -33,7 +33,7 @@ func insertKVTxn(tx *txn, entry *structs.DirEntry, updateMax bool) error {
return nil return nil
} }
func kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) { func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
var ents structs.DirEntries var ents structs.DirEntries
var lindex uint64 var lindex uint64
@ -56,7 +56,7 @@ func kvsListEntriesTxn(tx *txn, ws memdb.WatchSet, prefix string, entMeta *struc
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an // kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction. // existing transaction.
func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error { func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error {
// For prefix deletes, only insert one tombstone and delete the entire subtree // For prefix deletes, only insert one tombstone and delete the entire subtree
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix) deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
if err != nil { if err != nil {
@ -77,11 +77,11 @@ func (s *Store) kvsDeleteTreeTxn(tx *txn, idx uint64, prefix string, entMeta *st
return nil return nil
} }
func kvsMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 { func kvsMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "kvs", "tombstones") return maxIndexTxn(tx, "kvs", "tombstones")
} }
func kvsDeleteWithEntry(tx *txn, entry *structs.DirEntry, idx uint64) error { func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error {
// Delete the entry and update the index. // Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil { if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err) return fmt.Errorf("failed deleting kvs entry: %s", err)

View File

@ -13,14 +13,22 @@ type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
First(table, index string, args ...interface{}) (interface{}, error) First(table, index string, args ...interface{}) (interface{}, error)
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
}
// AbortTxn is a ReadTxn that can also be aborted to end the transaction.
type AbortTxn interface {
ReadTxn
Abort() Abort()
} }
// WriteTxn is implemented by memdb.Txn to perform write operations. // WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface { type WriteTxn interface {
ReadTxn ReadTxn
Defer(func())
Delete(table string, obj interface{}) error
DeleteAll(table, index string, args ...interface{}) (int, error)
DeletePrefix(table string, index string, prefix string) (bool, error)
Insert(table string, obj interface{}) error Insert(table string, obj interface{}) error
Commit() error
} }
// Changes wraps a memdb.Changes to include the index at which these changes // Changes wraps a memdb.Changes to include the index at which these changes
@ -46,20 +54,16 @@ type changeTrackerDB struct {
// with write=true. // with write=true.
// //
// Deprecated: use either ReadTxn, or WriteTxn. // Deprecated: use either ReadTxn, or WriteTxn.
func (c *changeTrackerDB) Txn(write bool) *txn { func (c *changeTrackerDB) Txn(write bool) *memdb.Txn {
if write { if write {
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)") panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
} }
return c.ReadTxn() return c.ReadTxn()
} }
// ReadTxn returns a read-only transaction which behaves exactly the same as // ReadTxn returns a read-only transaction.
// memdb.Txn func (c *changeTrackerDB) ReadTxn() *memdb.Txn {
// return c.db.Txn(false)
// TODO: this could return a regular memdb.Txn if all the state functions accepted
// the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)}
} }
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.

View File

@ -258,7 +258,7 @@ func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error {
// preparedQueryDeleteTxn is the inner method used to delete a prepared query // preparedQueryDeleteTxn is the inner method used to delete a prepared query
// with the proper indexes into the state store. // with the proper indexes into the state store.
func preparedQueryDeleteTxn(tx *txn, idx uint64, queryID string) error { func preparedQueryDeleteTxn(tx WriteTxn, idx uint64, queryID string) error {
// Pull the query. // Pull the query.
wrapped, err := tx.First("prepared-queries", "id", queryID) wrapped, err := tx.First("prepared-queries", "id", queryID)
if err != nil { if err != nil {

View File

@ -301,7 +301,7 @@ func (s *Store) SessionDestroy(idx uint64, sessionID string, entMeta *structs.En
// deleteSessionTxn is the inner method, which is used to do the actual // deleteSessionTxn is the inner method, which is used to do the actual
// session deletion and handle session invalidation, etc. // session deletion and handle session invalidation, etc.
func (s *Store) deleteSessionTxn(tx *txn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteSessionTxn(tx WriteTxn, idx uint64, sessionID string, entMeta *structs.EnterpriseMeta) error {
// Look up the session. // Look up the session.
sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta) sess, err := firstWithTxn(tx, "sessions", "id", sessionID, entMeta)
if err != nil { if err != nil {

View File

@ -35,7 +35,7 @@ func nodeChecksIndexer() *memdb.CompoundIndex {
} }
} }
func sessionDeleteWithSession(tx *txn, session *structs.Session, idx uint64) error { func sessionDeleteWithSession(tx WriteTxn, session *structs.Session, idx uint64) error {
if err := tx.Delete("sessions", session); err != nil { if err := tx.Delete("sessions", session); err != nil {
return fmt.Errorf("failed deleting session: %s", err) return fmt.Errorf("failed deleting session: %s", err)
} }
@ -80,11 +80,11 @@ func insertSessionTxn(tx *txn, session *structs.Session, idx uint64, updateMax b
return nil return nil
} }
func allNodeSessionsTxn(tx *txn, node string) (structs.Sessions, error) { func allNodeSessionsTxn(tx ReadTxn, node string) (structs.Sessions, error) {
return nodeSessionsTxn(tx, nil, node, nil) return nodeSessionsTxn(tx, nil, node, nil)
} }
func nodeSessionsTxn(tx *txn, func nodeSessionsTxn(tx ReadTxn,
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) { ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
sessions, err := tx.Get("sessions", "node", node) sessions, err := tx.Get("sessions", "node", node)
@ -100,7 +100,7 @@ func nodeSessionsTxn(tx *txn,
return result, nil return result, nil
} }
func sessionMaxIndex(tx *txn, entMeta *structs.EnterpriseMeta) uint64 { func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "sessions") return maxIndexTxn(tx, "sessions")
} }

View File

@ -122,7 +122,7 @@ type Store struct {
// works by starting a read transaction against the whole state store. // works by starting a read transaction against the whole state store.
type Snapshot struct { type Snapshot struct {
store *Store store *Store
tx *txn tx AbortTxn
lastIndex uint64 lastIndex uint64
} }
@ -288,7 +288,7 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to // indexUpdateMaxTxn is used when restoring entries and sets the table's index to
// the given idx only if it's greater than the current index. // the given idx only if it's greater than the current index.
func indexUpdateMaxTxn(tx *txn, idx uint64, table string) error { func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error {
ti, err := tx.First("index", "id", table) ti, err := tx.First("index", "id", table)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve existing index: %s", err) return fmt.Errorf("failed to retrieve existing index: %s", err)

View File

@ -8,7 +8,7 @@ import (
) )
// txnKVS handles all KV-related operations. // txnKVS handles all KV-related operations.
func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry var entry *structs.DirEntry
var err error var err error
@ -110,7 +110,7 @@ func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnRes
} }
// txnSession handles all Session-related operations. // txnSession handles all Session-related operations.
func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error { func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error {
var err error var err error
switch op.Verb { switch op.Verb {
@ -129,7 +129,7 @@ func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error {
// txnLegacyIntention handles all Intention-related operations. // txnLegacyIntention handles all Intention-related operations.
// //
// Deprecated: see TxnOp.Intention description // Deprecated: see TxnOp.Intention description
func txnLegacyIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error { func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) error {
switch op.Op { switch op.Op {
case structs.IntentionOpCreate, structs.IntentionOpUpdate: case structs.IntentionOpCreate, structs.IntentionOpUpdate:
return legacyIntentionSetTxn(tx, idx, op.Intention) return legacyIntentionSetTxn(tx, idx, op.Intention)
@ -145,7 +145,7 @@ func txnLegacyIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error {
} }
// txnNode handles all Node-related operations. // txnNode handles all Node-related operations.
func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) { func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
var entry *structs.Node var entry *structs.Node
var err error var err error
@ -214,7 +214,7 @@ func (s *Store) txnNode(tx *txn, idx uint64, op *structs.TxnNodeOp) (structs.Txn
} }
// txnService handles all Service-related operations. // txnService handles all Service-related operations.
func (s *Store) txnService(tx *txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) { func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
switch op.Verb { switch op.Verb {
case api.ServiceGet: case api.ServiceGet:
entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta) entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta)
@ -276,7 +276,7 @@ func newTxnResultFromNodeServiceEntry(entry *structs.NodeService) structs.TxnRes
} }
// txnCheck handles all Check-related operations. // txnCheck handles all Check-related operations.
func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
var entry *structs.HealthCheck var entry *structs.HealthCheck
var err error var err error
@ -338,7 +338,7 @@ func (s *Store) txnCheck(tx *txn, idx uint64, op *structs.TxnCheckOp) (structs.T
} }
// txnDispatch runs the given operations inside the state store transaction. // txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx *txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops)) results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops)) errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops { for i, op := range ops {