Merge pull request #4869 from hashicorp/txn-checks

Add node/service/check operations to transaction api
This commit is contained in:
Kyle Havlovitz 2019-01-22 11:16:09 -08:00 committed by GitHub
commit 5bdf130767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 2397 additions and 280 deletions

View File

@ -1435,3 +1435,107 @@ func vetDeregisterWithACL(rule acl.Authorizer, subj *structs.DeregisterRequest,
return nil
}
// vetNodeTxnOp applies the given ACL policy to a node transaction operation.
func vetNodeTxnOp(op *structs.TxnNodeOp, rule acl.Authorizer) error {
// Fast path if ACLs are not enabled.
if rule == nil {
return nil
}
node := op.Node
n := &api.Node{
Node: node.Node,
ID: string(node.ID),
Address: node.Address,
Datacenter: node.Datacenter,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
}
// Sentinel doesn't apply to deletes, only creates/updates, so we don't need the scopeFn.
var scope func() map[string]interface{}
if op.Verb != api.NodeDelete && op.Verb != api.NodeDeleteCAS {
scope = func() map[string]interface{} {
return sentinel.ScopeCatalogUpsert(n, nil)
}
}
if rule != nil && !rule.NodeWrite(node.Node, scope) {
return acl.ErrPermissionDenied
}
return nil
}
// vetServiceTxnOp applies the given ACL policy to a service transaction operation.
func vetServiceTxnOp(op *structs.TxnServiceOp, rule acl.Authorizer) error {
// Fast path if ACLs are not enabled.
if rule == nil {
return nil
}
service := op.Service
n := &api.Node{Node: op.Node}
svc := &api.AgentService{
ID: service.ID,
Service: service.Service,
Tags: service.Tags,
Meta: service.Meta,
Address: service.Address,
Port: service.Port,
EnableTagOverride: service.EnableTagOverride,
}
var scope func() map[string]interface{}
if op.Verb != api.ServiceDelete && op.Verb != api.ServiceDeleteCAS {
scope = func() map[string]interface{} {
return sentinel.ScopeCatalogUpsert(n, svc)
}
}
if !rule.ServiceWrite(service.Service, scope) {
return acl.ErrPermissionDenied
}
return nil
}
// vetCheckTxnOp applies the given ACL policy to a check transaction operation.
func vetCheckTxnOp(op *structs.TxnCheckOp, rule acl.Authorizer) error {
// Fast path if ACLs are not enabled.
if rule == nil {
return nil
}
n := &api.Node{Node: op.Check.Node}
svc := &api.AgentService{
ID: op.Check.ServiceID,
Service: op.Check.ServiceID,
Tags: op.Check.ServiceTags,
}
var scope func() map[string]interface{}
if op.Check.ServiceID == "" {
// Node-level check.
if op.Verb == api.CheckDelete || op.Verb == api.CheckDeleteCAS {
scope = func() map[string]interface{} {
return sentinel.ScopeCatalogUpsert(n, svc)
}
}
if !rule.NodeWrite(op.Check.Node, scope) {
return acl.ErrPermissionDenied
}
} else {
// Service-level check.
if op.Verb == api.CheckDelete || op.Verb == api.CheckDeleteCAS {
scope = func() map[string]interface{} {
return sentinel.ScopeCatalogUpsert(n, svc)
}
}
if !rule.ServiceWrite(op.Check.ServiceName, scope) {
return acl.ErrPermissionDenied
}
}
return nil
}

View File

@ -20,6 +20,72 @@ type Catalog struct {
srv *Server
}
// nodePreApply does the verification of a node before it is applied to Raft.
func nodePreApply(nodeName, nodeID string) error {
if nodeName == "" {
return fmt.Errorf("Must provide node")
}
if nodeID != "" {
if _, err := uuid.ParseUUID(nodeID); err != nil {
return fmt.Errorf("Bad node ID: %v", err)
}
}
return nil
}
func servicePreApply(service *structs.NodeService, rule acl.Authorizer) error {
// Validate the service. This is in addition to the below since
// the above just hasn't been moved over yet. We should move it over
// in time.
if err := service.Validate(); err != nil {
return err
}
// If no service id, but service name, use default
if service.ID == "" && service.Service != "" {
service.ID = service.Service
}
// Verify ServiceName provided if ID.
if service.ID != "" && service.Service == "" {
return fmt.Errorf("Must provide service name with ID")
}
// Check the service address here and in the agent endpoint
// since service registration isn't synchronous.
if ipaddr.IsAny(service.Address) {
return fmt.Errorf("Invalid service address")
}
// Apply the ACL policy if any. The 'consul' service is excluded
// since it is managed automatically internally (that behavior
// is going away after version 0.8). We check this same policy
// later if version 0.8 is enabled, so we can eventually just
// delete this and do all the ACL checks down there.
if service.Service != structs.ConsulServiceName {
if rule != nil && !rule.ServiceWrite(service.Service, nil) {
return acl.ErrPermissionDenied
}
}
// Proxies must have write permission on their destination
if service.Kind == structs.ServiceKindConnectProxy {
if rule != nil && !rule.ServiceWrite(service.Proxy.DestinationServiceName, nil) {
return acl.ErrPermissionDenied
}
}
return nil
}
// checkPreApply does the verification of a check before it is applied to Raft.
func checkPreApply(check *structs.HealthCheck) {
if check.CheckID == "" && check.Name != "" {
check.CheckID = types.CheckID(check.Name)
}
}
// Register is used register that a node is providing a given service.
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
if done, err := c.srv.forward("Catalog.Register", args, args, reply); done {
@ -27,67 +93,25 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
}
defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now())
// Verify the args.
if args.Node == "" {
return fmt.Errorf("Must provide node")
}
if args.Address == "" && !args.SkipNodeUpdate {
return fmt.Errorf("Must provide address if SkipNodeUpdate is not set")
}
if args.ID != "" {
if _, err := uuid.ParseUUID(string(args.ID)); err != nil {
return fmt.Errorf("Bad node ID: %v", err)
}
}
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
// Verify the args.
if err := nodePreApply(args.Node, string(args.ID)); err != nil {
return err
}
if args.Address == "" && !args.SkipNodeUpdate {
return fmt.Errorf("Must provide address if SkipNodeUpdate is not set")
}
// Handle a service registration.
if args.Service != nil {
// Validate the service. This is in addition to the below since
// the above just hasn't been moved over yet. We should move it over
// in time.
if err := args.Service.Validate(); err != nil {
if err := servicePreApply(args.Service, rule); err != nil {
return err
}
// If no service id, but service name, use default
if args.Service.ID == "" && args.Service.Service != "" {
args.Service.ID = args.Service.Service
}
// Verify ServiceName provided if ID.
if args.Service.ID != "" && args.Service.Service == "" {
return fmt.Errorf("Must provide service name with ID")
}
// Check the service address here and in the agent endpoint
// since service registration isn't synchronous.
if ipaddr.IsAny(args.Service.Address) {
return fmt.Errorf("Invalid service address")
}
// Apply the ACL policy if any. The 'consul' service is excluded
// since it is managed automatically internally (that behavior
// is going away after version 0.8). We check this same policy
// later if version 0.8 is enabled, so we can eventually just
// delete this and do all the ACL checks down there.
if args.Service.Service != structs.ConsulServiceName {
if rule != nil && !rule.ServiceWrite(args.Service.Service, nil) {
return acl.ErrPermissionDenied
}
}
// Proxies must have write permission on their destination
if args.Service.Kind == structs.ServiceKindConnectProxy {
if rule != nil && !rule.ServiceWrite(args.Service.Proxy.DestinationServiceName, nil) {
return acl.ErrPermissionDenied
}
}
}
// Move the old format single check into the slice, and fixup IDs.
@ -96,12 +120,10 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
args.Check = nil
}
for _, check := range args.Checks {
if check.CheckID == "" && check.Name != "" {
check.CheckID = types.CheckID(check.Name)
}
if check.Node == "" {
check.Node = args.Node
}
checkPreApply(check)
}
// Check the complete register request against the given ACL policy.

View File

@ -61,8 +61,18 @@ func (t *txnResultsFilter) Len() int {
func (t *txnResultsFilter) Filter(i int) bool {
result := t.results[i]
if result.KV != nil {
switch {
case result.KV != nil:
return !t.authorizer.KeyRead(result.KV.Key)
case result.Node != nil:
return !t.authorizer.NodeRead(result.Node.Node)
case result.Service != nil:
return !t.authorizer.ServiceRead(result.Service.Service)
case result.Check != nil:
if result.Check.ServiceName != "" {
return !t.authorizer.ServiceRead(result.Check.ServiceName)
}
return !t.authorizer.NodeRead(result.Check.Node)
}
return false
}

View File

@ -377,6 +377,35 @@ func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *memdb.Txn, node *structs.Node
return nil
}
// ensureNodeCASTxn updates a node only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error.
func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node) (bool, error) {
// Retrieve the existing entry.
existing, err := getNodeTxn(tx, node.Node)
if err != nil {
return false, err
}
// Check if the we should do the set. A ModifyIndex of 0 means that
// we are doing a set-if-not-exists.
if node.ModifyIndex == 0 && existing != nil {
return false, nil
}
if node.ModifyIndex != 0 && existing == nil {
return false, nil
}
if existing != nil && node.ModifyIndex != 0 && node.ModifyIndex != existing.ModifyIndex {
return false, nil
}
// Perform the update.
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
return false, err
}
return true, nil
}
// ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn.
@ -465,14 +494,22 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
idx := maxIndexTxn(tx, "nodes")
// Retrieve the node from the state store
node, err := tx.First("nodes", "id", id)
node, err := getNodeTxn(tx, id)
if err != nil {
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
}
if node != nil {
return idx, node.(*structs.Node), nil
return idx, node, nil
}
func getNodeTxn(tx *memdb.Txn, nodeName string) (*structs.Node, error) {
node, err := tx.First("nodes", "id", nodeName)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
}
return idx, nil, nil
if node != nil {
return node.(*structs.Node), nil
}
return nil, nil
}
func getNodeIDTxn(tx *memdb.Txn, id types.NodeID) (*structs.Node, error) {
@ -573,6 +610,34 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error {
return nil
}
// deleteNodeCASTxn is used to try doing a node delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteNodeCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName string) (bool, error) {
// Look up the node.
node, err := getNodeTxn(tx, nodeName)
if err != nil {
return false, err
}
if node == nil {
return false, nil
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
if node.ModifyIndex != cidx {
return false, nil
}
// Call the actual deletion if the above passed.
if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil {
return false, err
}
return true, nil
}
// deleteNodeTxn is the inner method used for removing a node from
// the store within a given transaction.
func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
@ -680,6 +745,36 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService)
return nil
}
// ensureServiceCASTxn updates a service only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error.
func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) (bool, error) {
// Retrieve the existing service.
existing, err := tx.First("services", "id", node, svc.ID)
if err != nil {
return false, fmt.Errorf("failed service lookup: %s", err)
}
// Check if the we should do the set. A ModifyIndex of 0 means that
// we are doing a set-if-not-exists.
if svc.ModifyIndex == 0 && existing != nil {
return false, nil
}
if svc.ModifyIndex != 0 && existing == nil {
return false, nil
}
e, ok := existing.(*structs.Node)
if ok && svc.ModifyIndex != 0 && svc.ModifyIndex != e.ModifyIndex {
return false, nil
}
// Perform the update.
if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil {
return false, err
}
return true, nil
}
// ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction.
func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
@ -1070,15 +1165,26 @@ func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs
idx := maxIndexTxn(tx, "services")
// Query the service
service, err := tx.First("services", "id", nodeName, serviceID)
service, err := s.getNodeServiceTxn(tx, nodeName, serviceID)
if err != nil {
return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
if service != nil {
return idx, service.(*structs.ServiceNode).ToNodeService(), nil
return idx, service, nil
}
func (s *Store) getNodeServiceTxn(tx *memdb.Txn, nodeName, serviceID string) (*structs.NodeService, error) {
// Query the service
service, err := tx.First("services", "id", nodeName, serviceID)
if err != nil {
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
return idx, nil, nil
if service != nil {
return service.(*structs.ServiceNode).ToNodeService(), nil
}
return nil, nil
}
// NodeServices is used to query service registrations by node name or UUID.
@ -1173,6 +1279,34 @@ func serviceIndexName(name string) string {
return fmt.Sprintf("service.%s", name)
}
// deleteServiceCASTxn is used to try doing a service delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given service, then the call is a noop, otherwise a normal delete is invoked.
func (s *Store) deleteServiceCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName, serviceID string) (bool, error) {
// Look up the service.
service, err := s.getNodeServiceTxn(tx, nodeName, serviceID)
if err != nil {
return false, fmt.Errorf("service lookup failed: %s", err)
}
if service == nil {
return false, nil
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
if service.ModifyIndex != cidx {
return false, nil
}
// Call the actual deletion if the above passed.
if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil {
return false, err
}
return true, nil
}
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
@ -1273,6 +1407,35 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID
return nil
}
// ensureCheckCASTxn updates a check only if the existing index matches the given index.
// Returns a bool indicating if a write happened and any error.
func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) (bool, error) {
// Retrieve the existing entry.
_, existing, err := s.getNodeCheckTxn(tx, hc.Node, hc.CheckID)
if err != nil {
return false, fmt.Errorf("failed health check lookup: %s", err)
}
// Check if the we should do the set. A ModifyIndex of 0 means that
// we are doing a set-if-not-exists.
if hc.ModifyIndex == 0 && existing != nil {
return false, nil
}
if hc.ModifyIndex != 0 && existing == nil {
return false, nil
}
if existing != nil && hc.ModifyIndex != 0 && hc.ModifyIndex != existing.ModifyIndex {
return false, nil
}
// Perform the update.
if err := s.ensureCheckTxn(tx, idx, hc); err != nil {
return false, err
}
return true, nil
}
// ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting
// checks with no matching node or service.
@ -1389,6 +1552,12 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *stru
tx := s.db.Txn(false)
defer tx.Abort()
return s.getNodeCheckTxn(tx, nodeName, checkID)
}
// nodeCheckTxn is used as the inner method to handle reading a health check
// from the state store.
func (s *Store) getNodeCheckTxn(tx *memdb.Txn, nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
// Get the table index.
idx := maxIndexTxn(tx, "checks")
@ -1578,6 +1747,34 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) erro
return nil
}
// deleteCheckCASTxn is used to try doing a check delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
func (s *Store) deleteCheckCASTxn(tx *memdb.Txn, idx, cidx uint64, node string, checkID types.CheckID) (bool, error) {
// Try to retrieve the existing health check.
_, hc, err := s.getNodeCheckTxn(tx, node, checkID)
if err != nil {
return false, fmt.Errorf("check lookup failed: %s", err)
}
if hc == nil {
return false, nil
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
if hc.ModifyIndex != cidx {
return false, nil
}
// Call the actual deletion if the above passed.
if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil {
return false, err
}
return true, nil
}
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {

View File

@ -118,10 +118,200 @@ func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntention
case structs.IntentionOpDelete:
return s.intentionDeleteTxn(tx, idx, op.Intention.ID)
default:
return fmt.Errorf("unknown Intention verb %q", op.Op)
return fmt.Errorf("unknown Intention op %q", op.Op)
}
}
// txnNode handles all Node-related operations.
func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
var entry *structs.Node
var err error
getNode := func() (*structs.Node, error) {
if op.Node.ID != "" {
return getNodeIDTxn(tx, op.Node.ID)
} else {
return getNodeTxn(tx, op.Node.Node)
}
}
switch op.Verb {
case api.NodeGet:
entry, err = getNode()
if entry == nil && err == nil {
err = fmt.Errorf("node %q doesn't exist", op.Node.Node)
}
case api.NodeSet:
err = s.ensureNodeTxn(tx, idx, &op.Node)
if err == nil {
entry, err = getNode()
}
case api.NodeCAS:
var ok bool
ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node)
if !ok && err == nil {
err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node)
break
}
entry, err = getNode()
case api.NodeDelete:
err = s.deleteNodeTxn(tx, idx, op.Node.Node)
case api.NodeDeleteCAS:
var ok bool
ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node)
if !ok && err == nil {
err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node)
}
default:
err = fmt.Errorf("unknown Node verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == api.NodeGet {
result := structs.TxnResult{Node: entry}
return structs.TxnResults{&result}, nil
}
clone := *entry
result := structs.TxnResult{Node: &clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
// txnService handles all Service-related operations.
func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
var entry *structs.NodeService
var err error
switch op.Verb {
case api.ServiceGet:
entry, err = s.getNodeServiceTxn(tx, op.Node, op.Service.ID)
if entry == nil && err == nil {
err = fmt.Errorf("service %q on node %q doesn't exist", op.Service.ID, op.Node)
}
case api.ServiceSet:
err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service)
entry, err = s.getNodeServiceTxn(tx, op.Node, op.Service.ID)
case api.ServiceCAS:
var ok bool
ok, err = s.ensureServiceCASTxn(tx, idx, op.Node, &op.Service)
if !ok && err == nil {
err = fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node)
break
}
entry, err = s.getNodeServiceTxn(tx, op.Node, op.Service.ID)
case api.ServiceDelete:
err = s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID)
case api.ServiceDeleteCAS:
var ok bool
ok, err = s.deleteServiceCASTxn(tx, idx, op.Service.ModifyIndex, op.Node, op.Service.ID)
if !ok && err == nil {
err = fmt.Errorf("failed to delete service %q on node %q, index is stale", op.Service.ID, op.Node)
}
default:
err = fmt.Errorf("unknown Service verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == api.ServiceGet {
result := structs.TxnResult{Service: entry}
return structs.TxnResults{&result}, nil
}
clone := *entry
result := structs.TxnResult{Service: &clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
// txnCheck handles all Check-related operations.
func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
var entry *structs.HealthCheck
var err error
switch op.Verb {
case api.CheckGet:
_, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID)
if entry == nil && err == nil {
err = fmt.Errorf("check %q on node %q doesn't exist", op.Check.CheckID, op.Check.Node)
}
case api.CheckSet:
err = s.ensureCheckTxn(tx, idx, &op.Check)
if err == nil {
_, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID)
}
case api.CheckCAS:
var ok bool
entry = &op.Check
ok, err = s.ensureCheckCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set check %q on node %q, index is stale", entry.CheckID, entry.Node)
break
}
_, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID)
case api.CheckDelete:
err = s.deleteCheckTxn(tx, idx, op.Check.Node, op.Check.CheckID)
case api.CheckDeleteCAS:
var ok bool
ok, err = s.deleteCheckCASTxn(tx, idx, op.Check.ModifyIndex, op.Check.Node, op.Check.CheckID)
if !ok && err == nil {
err = fmt.Errorf("failed to delete check %q on node %q, index is stale", op.Check.CheckID, op.Check.Node)
}
default:
err = fmt.Errorf("unknown Check verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == api.CheckGet {
result := structs.TxnResult{Check: entry}
return structs.TxnResults{&result}, nil
}
clone := entry.Clone()
result := structs.TxnResult{Check: clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
// txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
@ -136,6 +326,12 @@ func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (stru
ret, err = s.txnKVS(tx, idx, op.KV)
case op.Intention != nil:
err = s.txnIntention(tx, idx, op.Intention)
case op.Node != nil:
ret, err = s.txnNode(tx, idx, op.Node)
case op.Service != nil:
ret, err = s.txnService(tx, idx, op.Service)
case op.Check != nil:
ret, err = s.txnCheck(tx, idx, op.Check)
default:
err = fmt.Errorf("no operation specified")
}

View File

@ -1,12 +1,14 @@
package state
import (
"fmt"
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/require"
)
@ -116,6 +118,384 @@ func TestStateStore_Txn_Intention(t *testing.T) {
verify.Values(t, "", actual, intentions)
}
func TestStateStore_Txn_Node(t *testing.T) {
require := require.New(t)
s := testStateStore(t)
// Create some nodes.
var nodes [5]structs.Node
for i := 0; i < len(nodes); i++ {
nodes[i] = structs.Node{
Node: fmt.Sprintf("node%d", i+1),
ID: types.NodeID(testUUID()),
}
// Leave node5 to be created by an operation.
if i < 5 {
s.EnsureNode(uint64(i+1), &nodes[i])
}
}
// Set up a transaction that hits every operation.
ops := structs.TxnOps{
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeGet,
Node: nodes[0],
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeSet,
Node: nodes[4],
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeCAS,
Node: structs.Node{
Node: "node2",
ID: nodes[1].ID,
Datacenter: "dc2",
RaftIndex: structs.RaftIndex{ModifyIndex: 2},
},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeDelete,
Node: structs.Node{Node: "node3"},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeDeleteCAS,
Node: structs.Node{
Node: "node4",
RaftIndex: structs.RaftIndex{ModifyIndex: 4},
},
},
},
}
results, errors := s.TxnRW(8, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
// Make sure the response looks as expected.
nodes[1].Datacenter = "dc2"
nodes[1].ModifyIndex = 8
expected := structs.TxnResults{
&structs.TxnResult{
Node: &nodes[0],
},
&structs.TxnResult{
Node: &nodes[4],
},
&structs.TxnResult{
Node: &nodes[1],
},
}
verify.Values(t, "", results, expected)
// Pull the resulting state store contents.
idx, actual, err := s.Nodes(nil)
require.NoError(err)
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
expectedNodes := structs.Nodes{&nodes[0], &nodes[1], &nodes[4]}
verify.Values(t, "", actual, expectedNodes)
}
func TestStateStore_Txn_Service(t *testing.T) {
require := require.New(t)
s := testStateStore(t)
testRegisterNode(t, s, 1, "node1")
// Create some services.
for i := 1; i <= 4; i++ {
testRegisterService(t, s, uint64(i+1), "node1", fmt.Sprintf("svc%d", i))
}
// Set up a transaction that hits every operation.
ops := structs.TxnOps{
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceGet,
Node: "node1",
Service: structs.NodeService{ID: "svc1"},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceSet,
Node: "node1",
Service: structs.NodeService{ID: "svc5"},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceCAS,
Node: "node1",
Service: structs.NodeService{
ID: "svc2",
Tags: []string{"modified"},
RaftIndex: structs.RaftIndex{ModifyIndex: 3},
},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceDelete,
Node: "node1",
Service: structs.NodeService{ID: "svc3"},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceDeleteCAS,
Node: "node1",
Service: structs.NodeService{
ID: "svc4",
RaftIndex: structs.RaftIndex{ModifyIndex: 5},
},
},
},
}
results, errors := s.TxnRW(6, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
Service: &structs.NodeService{
ID: "svc1",
Service: "svc1",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
Service: &structs.NodeService{
ID: "svc5",
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 6,
ModifyIndex: 6,
},
},
},
&structs.TxnResult{
Service: &structs.NodeService{
ID: "svc2",
Tags: []string{"modified"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 6,
},
},
},
}
verify.Values(t, "", results, expected)
// Pull the resulting state store contents.
idx, actual, err := s.NodeServices(nil, "node1")
require.NoError(err)
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
expectedServices := &structs.NodeServices{
Node: &structs.Node{
Node: "node1",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
Services: map[string]*structs.NodeService{
"svc1": &structs.NodeService{
ID: "svc1",
Service: "svc1",
Address: "1.1.1.1",
Port: 1111,
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
Weights: &structs.Weights{Passing: 1, Warning: 1},
},
"svc5": &structs.NodeService{
ID: "svc5",
RaftIndex: structs.RaftIndex{
CreateIndex: 6,
ModifyIndex: 6,
},
Weights: &structs.Weights{Passing: 1, Warning: 1},
},
"svc2": &structs.NodeService{
ID: "svc2",
Tags: []string{"modified"},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 6,
},
Weights: &structs.Weights{Passing: 1, Warning: 1},
},
},
}
verify.Values(t, "", actual, expectedServices)
}
func TestStateStore_Txn_Checks(t *testing.T) {
require := require.New(t)
s := testStateStore(t)
testRegisterNode(t, s, 1, "node1")
// Create some checks.
for i := 1; i <= 4; i++ {
testRegisterCheck(t, s, uint64(i+1), "node1", "", types.CheckID(fmt.Sprintf("check%d", i)), "failing")
}
// Set up a transaction that hits every operation.
ops := structs.TxnOps{
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckGet,
Check: structs.HealthCheck{Node: "node1", CheckID: "check1"},
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckSet,
Check: structs.HealthCheck{Node: "node1", CheckID: "check5", Status: "passing"},
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckCAS,
Check: structs.HealthCheck{
Node: "node1",
CheckID: "check2",
Status: "warning",
RaftIndex: structs.RaftIndex{ModifyIndex: 3},
},
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckDelete,
Check: structs.HealthCheck{Node: "node1", CheckID: "check3"},
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckDeleteCAS,
Check: structs.HealthCheck{
Node: "node1",
CheckID: "check4",
RaftIndex: structs.RaftIndex{ModifyIndex: 5},
},
},
},
}
results, errors := s.TxnRW(6, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
Check: &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Status: "failing",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
Check: &structs.HealthCheck{
Node: "node1",
CheckID: "check5",
Status: "passing",
RaftIndex: structs.RaftIndex{
CreateIndex: 6,
ModifyIndex: 6,
},
},
},
&structs.TxnResult{
Check: &structs.HealthCheck{
Node: "node1",
CheckID: "check2",
Status: "warning",
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 6,
},
},
},
}
verify.Values(t, "", results, expected)
// Pull the resulting state store contents.
idx, actual, err := s.NodeChecks(nil, "node1")
require.NoError(err)
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
expectedChecks := structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Status: "failing",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
&structs.HealthCheck{
Node: "node1",
CheckID: "check2",
Status: "warning",
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 6,
},
},
&structs.HealthCheck{
Node: "node1",
CheckID: "check5",
Status: "passing",
RaftIndex: structs.RaftIndex{
CreateIndex: 6,
ModifyIndex: 6,
},
},
}
verify.Values(t, "", actual, expectedChecks)
}
func TestStateStore_Txn_KVS(t *testing.T) {
s := testStateStore(t)

View File

@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
// Txn endpoint is used to perform multi-object atomic transactions.
@ -21,7 +22,8 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
// Perform the pre-apply checks for any KV operations.
for i, op := range ops {
if op.KV != nil {
switch {
case op.KV != nil:
ok, err := kvsPreApply(t.srv, authorizer, op.KV.Verb, &op.KV.DirEnt)
if err != nil {
errors = append(errors, &structs.TxnError{
@ -35,6 +37,65 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
What: err.Error(),
})
}
case op.Node != nil:
// Skip the pre-apply checks if this is a GET.
if op.Node.Verb == api.NodeGet {
break
}
node := op.Node.Node
if err := nodePreApply(node.Node, string(node.ID)); err != nil {
errors = append(errors, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
break
}
// Check that the token has permissions for the given operation.
if err := vetNodeTxnOp(op.Node, authorizer); err != nil {
errors = append(errors, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
}
case op.Service != nil:
// Skip the pre-apply checks if this is a GET.
if op.Service.Verb == api.ServiceGet {
break
}
service := &op.Service.Service
if err := servicePreApply(service, nil); err != nil {
errors = append(errors, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
break
}
// Check that the token has permissions for the given operation.
if err := vetServiceTxnOp(op.Service, authorizer); err != nil {
errors = append(errors, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
}
case op.Check != nil:
// Skip the pre-apply checks if this is a GET.
if op.Check.Verb == api.CheckGet {
break
}
checkPreApply(&op.Check.Check)
// Check that the token has permissions for the given operation.
if err := vetCheckTxnOp(op.Check, authorizer); err != nil {
errors = append(errors, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
}
}
}

View File

@ -12,9 +12,49 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/require"
)
var testTxnRules = `
key "" {
policy = "deny"
}
key "foo" {
policy = "read"
}
key "test" {
policy = "write"
}
key "test/priv" {
policy = "read"
}
service "" {
policy = "deny"
}
service "foo-svc" {
policy = "read"
}
service "test-svc" {
policy = "write"
}
node "" {
policy = "deny"
}
node "foo-node" {
policy = "read"
}
node "test-node" {
policy = "write"
}
`
var testNodeID = "9749a7df-fac5-46b4-8078-32a3d96c59f3"
func TestTxn_CheckNotExists(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
@ -101,12 +141,76 @@ func TestTxn_Apply(t *testing.T) {
},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeSet,
Node: structs.Node{
ID: types.NodeID(testNodeID),
Node: "foo",
Address: "127.0.0.1",
},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeGet,
Node: structs.Node{
ID: types.NodeID(testNodeID),
Node: "foo",
},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceSet,
Node: "foo",
Service: structs.NodeService{
ID: "svc-foo",
Service: "svc-foo",
Address: "1.1.1.1",
},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceGet,
Node: "foo",
Service: structs.NodeService{
ID: "svc-foo",
Service: "svc-foo",
},
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckSet,
Check: structs.HealthCheck{
Node: "foo",
CheckID: types.CheckID("check-foo"),
Name: "test",
Status: "passing",
},
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckGet,
Check: structs.HealthCheck{
Node: "foo",
CheckID: types.CheckID("check-foo"),
Name: "test",
},
},
},
},
}
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Errors) != 0 {
t.Fatalf("errs: %v", out.Errors)
}
// Verify the state store directly.
state := s1.fsm.State()
@ -122,6 +226,30 @@ func TestTxn_Apply(t *testing.T) {
t.Fatalf("bad: %v", d)
}
_, n, err := state.GetNode("foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if n.Node != "foo" || n.Address != "127.0.0.1" {
t.Fatalf("bad: %v", err)
}
_, s, err := state.NodeService("foo", "svc-foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if s.ID != "svc-foo" || s.Address != "1.1.1.1" {
t.Fatalf("bad: %v", err)
}
_, c, err := state.NodeCheck("foo", types.CheckID("check-foo"))
if err != nil {
t.Fatalf("err: %v", err)
}
if c.CheckID != "check-foo" || c.Status != "passing" || c.Name != "test" {
t.Fatalf("bad: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnResponse{
Results: structs.TxnResults{
@ -147,15 +275,34 @@ func TestTxn_Apply(t *testing.T) {
},
},
},
&structs.TxnResult{
Node: n,
},
&structs.TxnResult{
Node: n,
},
&structs.TxnResult{
Service: s,
},
&structs.TxnResult{
Service: s,
},
&structs.TxnResult{
Check: c,
},
&structs.TxnResult{
Check: c,
},
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
verify.Values(t, "", out, expected)
}
func TestTxn_Apply_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
@ -167,15 +314,25 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
// Set up some state to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "nope",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
require.NoError(state.KVSSet(1, d))
node := &structs.Node{
ID: types.NodeID(testNodeID),
Node: "nope",
}
require.NoError(state.EnsureNode(2, node))
svc := structs.NodeService{ID: "nope", Service: "nope", Address: "127.0.0.1"}
require.NoError(state.EnsureService(3, "nope", &svc))
check := structs.HealthCheck{Node: "nope", CheckID: types.CheckID("nope")}
state.EnsureCheck(4, &check)
// Create the ACL.
var id string
@ -186,7 +343,7 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: testListRules,
Rules: testTxnRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
@ -296,6 +453,101 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeGet,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeSet,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeCAS,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeDelete,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeDeleteCAS,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceGet,
Node: "foo-node",
Service: svc,
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceSet,
Node: "foo-node",
Service: svc,
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceCAS,
Node: "foo-node",
Service: svc,
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceDelete,
Node: "foo-node",
Service: svc,
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceDeleteCAS,
Node: "foo-node",
Service: svc,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckGet,
Check: check,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckSet,
Check: check,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckCAS,
Check: check,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckDelete,
Check: check,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckDeleteCAS,
Check: check,
},
},
},
WriteRequest: structs.WriteRequest{
Token: id,
@ -309,20 +561,55 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
// Verify the transaction's return value.
var expected structs.TxnResponse
for i, op := range arg.Ops {
switch op.KV.Verb {
case api.KVGet, api.KVGetTree:
// These get filtered but won't result in an error.
switch {
case op.KV != nil:
switch op.KV.Verb {
case api.KVGet, api.KVGetTree:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
case op.Node != nil:
switch op.Node.Verb {
case api.NodeGet:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
case op.Service != nil:
switch op.Service.Verb {
case api.ServiceGet:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
case op.Check != nil:
switch op.Check.Verb {
case api.CheckGet:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
}
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
verify.Values(t, "", out, expected)
}
func TestTxn_Apply_LockDelay(t *testing.T) {
@ -413,6 +700,9 @@ func TestTxn_Apply_LockDelay(t *testing.T) {
func TestTxn_Read(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -431,6 +721,19 @@ func TestTxn_Read(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Put in a node/check/service to read back.
node := &structs.Node{
ID: types.NodeID(testNodeID),
Node: "foo",
}
require.NoError(state.EnsureNode(2, node))
svc := structs.NodeService{ID: "svc-foo", Service: "svc-foo", Address: "127.0.0.1"}
require.NoError(state.EnsureService(3, "foo", &svc))
check := structs.HealthCheck{Node: "foo", CheckID: types.CheckID("check-foo")}
state.EnsureCheck(4, &check)
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
@ -445,6 +748,25 @@ func TestTxn_Read(t *testing.T) {
},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeGet,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceGet,
Node: "foo",
Service: svc,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckGet,
Check: check,
},
},
},
}
var out structs.TxnReadResponse
@ -453,6 +775,8 @@ func TestTxn_Read(t *testing.T) {
}
// Verify the transaction's return value.
svc.Weights = &structs.Weights{Passing: 1, Warning: 1}
svc.RaftIndex = structs.RaftIndex{CreateIndex: 3, ModifyIndex: 3}
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
@ -466,19 +790,29 @@ func TestTxn_Read(t *testing.T) {
},
},
},
&structs.TxnResult{
Node: node,
},
&structs.TxnResult{
Service: &svc,
},
&structs.TxnResult{
Check: &check,
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
verify.Values(t, "", out, expected)
}
func TestTxn_Read_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
@ -502,6 +836,19 @@ func TestTxn_Read_ACLDeny(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Put in a node/check/service to read back.
node := &structs.Node{
ID: types.NodeID(testNodeID),
Node: "nope",
}
require.NoError(state.EnsureNode(2, node))
svc := structs.NodeService{ID: "nope", Service: "nope", Address: "127.0.0.1"}
require.NoError(state.EnsureService(3, "nope", &svc))
check := structs.HealthCheck{Node: "nope", CheckID: types.CheckID("nope")}
state.EnsureCheck(4, &check)
// Create the ACL.
var id string
{
@ -511,7 +858,7 @@ func TestTxn_Read_ACLDeny(t *testing.T) {
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: testListRules,
Rules: testTxnRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
@ -557,6 +904,25 @@ func TestTxn_Read_ACLDeny(t *testing.T) {
},
},
},
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: api.NodeGet,
Node: structs.Node{ID: node.ID, Node: node.Node},
},
},
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: api.ServiceGet,
Node: "foo",
Service: svc,
},
},
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: api.CheckGet,
Check: check,
},
},
},
QueryOptions: structs.QueryOptions{
Token: id,
@ -574,15 +940,51 @@ func TestTxn_Read_ACLDeny(t *testing.T) {
},
}
for i, op := range arg.Ops {
switch op.KV.Verb {
case api.KVGet, api.KVGetTree:
// These get filtered but won't result in an error.
switch {
case op.KV != nil:
switch op.KV.Verb {
case api.KVGet, api.KVGetTree:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
case op.Node != nil:
switch op.Node.Verb {
case api.NodeGet:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
case op.Service != nil:
switch op.Service.Verb {
case api.ServiceGet:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
case op.Check != nil:
switch op.Check.Verb {
case api.CheckGet:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
}
}
if !reflect.DeepEqual(out, expected) {

View File

@ -508,7 +508,18 @@ func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error)
return err
}
}
return mapstructure.Decode(raw, out)
decodeConf := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: &out,
}
decoder, err := mapstructure.NewDecoder(decodeConf)
if err != nil {
return err
}
return decoder.Decode(raw)
}
// setTranslateAddr is used to set the address translation header. This is only

View File

@ -2,6 +2,7 @@ package structs
import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"reflect"
@ -906,14 +907,72 @@ type HealthCheck struct {
}
type HealthCheckDefinition struct {
HTTP string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
Header map[string][]string `json:",omitempty"`
Method string `json:",omitempty"`
TCP string `json:",omitempty"`
Interval api.ReadableDuration `json:",omitempty"`
Timeout api.ReadableDuration `json:",omitempty"`
DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"`
HTTP string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
Header map[string][]string `json:",omitempty"`
Method string `json:",omitempty"`
TCP string `json:",omitempty"`
Interval time.Duration `json:",omitempty"`
Timeout time.Duration `json:",omitempty"`
DeregisterCriticalServiceAfter time.Duration `json:",omitempty"`
}
func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) {
type Alias HealthCheckDefinition
exported := &struct {
Interval string
Timeout string
DeregisterCriticalServiceAfter string
*Alias
}{
Interval: d.Interval.String(),
Timeout: d.Timeout.String(),
DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(),
Alias: (*Alias)(d),
}
if d.Interval == 0 {
exported.Interval = ""
}
if d.Timeout == 0 {
exported.Timeout = ""
}
if d.DeregisterCriticalServiceAfter == 0 {
exported.DeregisterCriticalServiceAfter = ""
}
return json.Marshal(exported)
}
func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error {
type Alias HealthCheckDefinition
aux := &struct {
Interval string
Timeout string
DeregisterCriticalServiceAfter string
*Alias
}{
Alias: (*Alias)(d),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.Interval != "" {
if d.Interval, err = time.ParseDuration(aux.Interval); err != nil {
return err
}
}
if aux.Timeout != "" {
if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil {
return err
}
}
if aux.DeregisterCriticalServiceAfter != "" {
if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil {
return err
}
}
return nil
}
// IsSame checks if one HealthCheck is the same as another, without looking
@ -929,7 +988,8 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool {
c.Output != other.Output ||
c.ServiceID != other.ServiceID ||
c.ServiceName != other.ServiceName ||
!reflect.DeepEqual(c.ServiceTags, other.ServiceTags) {
!reflect.DeepEqual(c.ServiceTags, other.ServiceTags) ||
!reflect.DeepEqual(c.Definition, other.Definition) {
return false
}

View File

@ -9,7 +9,7 @@ import (
)
// TxnKVOp is used to define a single operation on the KVS inside a
// transaction
// transaction.
type TxnKVOp struct {
Verb api.KVOp
DirEnt DirEntry
@ -19,6 +19,40 @@ type TxnKVOp struct {
// inside a transaction.
type TxnKVResult *DirEntry
// TxnNodeOp is used to define a single operation on a node in the catalog inside
// a transaction.
type TxnNodeOp struct {
Verb api.NodeOp
Node Node
}
// TxnNodeResult is used to define the result of a single operation on a node
// in the catalog inside a transaction.
type TxnNodeResult *Node
// TxnServiceOp is used to define a single operation on a service in the catalog inside
// a transaction.
type TxnServiceOp struct {
Verb api.ServiceOp
Node string
Service NodeService
}
// TxnServiceResult is used to define the result of a single operation on a service
// in the catalog inside a transaction.
type TxnServiceResult *NodeService
// TxnCheckOp is used to define a single operation on a health check inside a
// transaction.
type TxnCheckOp struct {
Verb api.CheckOp
Check HealthCheck
}
// TxnCheckResult is used to define the result of a single operation on a health
// check inside a transaction.
type TxnCheckResult *HealthCheck
// TxnKVOp is used to define a single operation on an Intention inside a
// transaction.
type TxnIntentionOp IntentionRequest
@ -28,6 +62,9 @@ type TxnIntentionOp IntentionRequest
type TxnOp struct {
KV *TxnKVOp
Intention *TxnIntentionOp
Node *TxnNodeOp
Service *TxnServiceOp
Check *TxnCheckOp
}
// TxnOps is a list of operations within a transaction.
@ -75,7 +112,10 @@ type TxnErrors []*TxnError
// TxnResult is used to define the result of a given operation inside a
// transaction. Only one of the types should be filled out per entry.
type TxnResult struct {
KV TxnKVResult
KV TxnKVResult `json:",omitempty"`
Node TxnNodeResult `json:",omitempty"`
Service TxnServiceResult `json:",omitempty"`
Check TxnCheckResult `json:",omitempty"`
}
// TxnResults is a list of TxnResult entries.

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
)
const (
@ -48,9 +49,9 @@ func decodeValue(rawKV interface{}) error {
return nil
}
// fixupKVOp looks for non-nil KV operations and passes them on for
// fixupTxnOp looks for non-nil Txn operations and passes them on for
// value conversion.
func fixupKVOp(rawOp interface{}) error {
func fixupTxnOp(rawOp interface{}) error {
rawMap, ok := rawOp.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp)
@ -67,15 +68,15 @@ func fixupKVOp(rawOp interface{}) error {
return nil
}
// fixupKVOps takes the raw decoded JSON and base64 decodes values in KV ops,
// fixupTxnOps takes the raw decoded JSON and base64 decodes values in Txn ops,
// replacing them with byte arrays.
func fixupKVOps(raw interface{}) error {
func fixupTxnOps(raw interface{}) error {
rawSlice, ok := raw.([]interface{})
if !ok {
return fmt.Errorf("unexpected raw type: %t", raw)
}
for _, rawOp := range rawSlice {
if err := fixupKVOp(rawOp); err != nil {
if err := fixupTxnOp(rawOp); err != nil {
return err
}
}
@ -100,7 +101,7 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var ops api.TxnOps
if err := decodeBody(req, &ops, fixupKVOps); err != nil {
if err := decodeBody(req, &ops, fixupTxnOps); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Failed to parse body: %v", err)
return nil, 0, false
@ -123,7 +124,8 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
var writes int
var netKVSize int
for _, in := range ops {
if in.KV != nil {
switch {
case in.KV != nil:
size := len(in.KV.Value)
if size > maxKVSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
@ -152,6 +154,102 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
},
}
opsRPC = append(opsRPC, out)
case in.Node != nil:
if in.Node.Verb != api.NodeGet {
writes++
}
// Setup the default DC if not provided
if in.Node.Node.Datacenter == "" {
in.Node.Node.Datacenter = s.agent.config.Datacenter
}
node := in.Node.Node
out := &structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: in.Node.Verb,
Node: structs.Node{
ID: types.NodeID(node.ID),
Node: node.Node,
Address: node.Address,
Datacenter: node.Datacenter,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
RaftIndex: structs.RaftIndex{
ModifyIndex: node.ModifyIndex,
},
},
},
}
opsRPC = append(opsRPC, out)
case in.Service != nil:
if in.Service.Verb != api.ServiceGet {
writes++
}
svc := in.Service.Service
out := &structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: in.Service.Verb,
Node: in.Service.Node,
Service: structs.NodeService{
ID: svc.ID,
Service: svc.Service,
Tags: svc.Tags,
Address: svc.Address,
Meta: svc.Meta,
Port: svc.Port,
Weights: &structs.Weights{
Passing: svc.Weights.Passing,
Warning: svc.Weights.Warning,
},
EnableTagOverride: svc.EnableTagOverride,
RaftIndex: structs.RaftIndex{
ModifyIndex: svc.ModifyIndex,
},
},
},
}
opsRPC = append(opsRPC, out)
case in.Check != nil:
if in.Check.Verb != api.CheckGet {
writes++
}
check := in.Check.Check
out := &structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: in.Check.Verb,
Check: structs.HealthCheck{
Node: check.Node,
CheckID: types.CheckID(check.CheckID),
Name: check.Name,
Status: check.Status,
Notes: check.Notes,
Output: check.Output,
ServiceID: check.ServiceID,
ServiceName: check.ServiceName,
ServiceTags: check.ServiceTags,
Definition: structs.HealthCheckDefinition{
HTTP: check.Definition.HTTP,
TLSSkipVerify: check.Definition.TLSSkipVerify,
Header: check.Definition.Header,
Method: check.Definition.Method,
TCP: check.Definition.TCP,
Interval: check.Definition.Interval,
Timeout: check.Definition.Timeout,
DeregisterCriticalServiceAfter: check.Definition.DeregisterCriticalServiceAfter,
},
RaftIndex: structs.RaftIndex{
ModifyIndex: check.ModifyIndex,
},
},
},
}
opsRPC = append(opsRPC, out)
}
}

View File

@ -1,8 +1,10 @@
package api
import (
"encoding/json"
"fmt"
"strings"
"time"
)
const (
@ -36,6 +38,9 @@ type HealthCheck struct {
ServiceTags []string
Definition HealthCheckDefinition
CreateIndex uint64
ModifyIndex uint64
}
// HealthCheckDefinition is used to store the details about
@ -46,9 +51,56 @@ type HealthCheckDefinition struct {
Method string
TLSSkipVerify bool
TCP string
Interval ReadableDuration
Timeout ReadableDuration
DeregisterCriticalServiceAfter ReadableDuration
Interval time.Duration
Timeout time.Duration
DeregisterCriticalServiceAfter time.Duration
}
func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) {
type Alias HealthCheckDefinition
return json.Marshal(&struct {
Interval string
Timeout string
DeregisterCriticalServiceAfter string
*Alias
}{
Interval: d.Interval.String(),
Timeout: d.Timeout.String(),
DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(),
Alias: (*Alias)(d),
})
}
func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error {
type Alias HealthCheckDefinition
aux := &struct {
Interval string
Timeout string
DeregisterCriticalServiceAfter string
*Alias
}{
Alias: (*Alias)(d),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.Interval != "" {
if d.Interval, err = time.ParseDuration(aux.Interval); err != nil {
return err
}
}
if aux.Timeout != "" {
if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil {
return err
}
}
if aux.DeregisterCriticalServiceAfter != "" {
if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil {
return err
}
}
return nil
}
// HealthChecks is a collection of HealthCheck structs.

View File

@ -213,9 +213,9 @@ func TestAPI_HealthChecks(t *testing.T) {
if meta.LastIndex == 0 {
r.Fatalf("bad: %v", meta)
}
if got, want := out, checks; !verify.Values(t, "checks", got, want) {
r.Fatal("health.Checks failed")
}
checks[0].CreateIndex = out[0].CreateIndex
checks[0].ModifyIndex = out[0].ModifyIndex
verify.Values(r, "checks", out, checks)
})
}

160
api/kv.go
View File

@ -45,44 +45,6 @@ type KVPair struct {
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair
// KVOp constants give possible operations available in a KVTxn.
type KVOp string
const (
KVSet KVOp = "set"
KVDelete KVOp = "delete"
KVDeleteCAS KVOp = "delete-cas"
KVDeleteTree KVOp = "delete-tree"
KVCAS KVOp = "cas"
KVLock KVOp = "lock"
KVUnlock KVOp = "unlock"
KVGet KVOp = "get"
KVGetTree KVOp = "get-tree"
KVCheckSession KVOp = "check-session"
KVCheckIndex KVOp = "check-index"
KVCheckNotExists KVOp = "check-not-exists"
)
// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Verb KVOp
Key string
Value []byte
Flags uint64
Index uint64
Session string
}
// KVTxnOps defines a set of operations to be performed inside a single
// transaction.
type KVTxnOps []*KVTxnOp
// KVTxnResponse has the outcome of a transaction.
type KVTxnResponse struct {
Results []*KVPair
Errors TxnErrors
}
// KV is used to manipulate the K/V API
type KV struct {
c *Client
@ -300,121 +262,25 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
return res, qm, nil
}
// TxnOp is the internal format we send to Consul. It's not specific to KV,
// though currently only KV operations are supported.
type TxnOp struct {
KV *KVTxnOp
}
// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp
// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KV *KVPair
}
// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult
// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}
// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError
// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}
// Txn is used to apply multiple KV operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure
// to define operations. If any operation fails, none of the changes are applied
// to the state store. Note that this hides the internal raw transaction interface
// and munges the input and output types into KV-specific ones for ease of use.
// If there are more non-KV operations in the future we may break out a new
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
//
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example:
//
// ops := KVTxnOps{
// &KVTxnOp{
// Verb: KVLock,
// Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"),
// },
// &KVTxnOp{
// Verb: KVGet,
// Key: "another/key",
// },
// }
// ok, response, _, err := kv.Txn(&ops, nil)
//
// If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries
// for each operation. Deleted keys will have a nil entry in the, and to save
// space, the Value of each key in the Results will be nil unless the operation
// is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error
// message.
// The Txn function has been deprecated from the KV object; please see the Txn
// object for more information about Transactions.
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn")
r.setQueryOptions(q)
// Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
for _, kvOp := range txn {
ops = append(ops, &TxnOp{KV: kvOp})
var ops TxnOps
for _, op := range txn {
ops = append(ops, &TxnOp{KV: op})
}
r.obj = ops
rtt, resp, err := k.c.doRequest(r)
respOk, txnResp, qm, err := k.c.txn(ops, q)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err
}
// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
return respOk, &kvResp, qm, nil
}

View File

@ -456,7 +456,7 @@ func TestAPI_ClientAcquireRelease(t *testing.T) {
}
}
func TestAPI_ClientTxn(t *testing.T) {
func TestAPI_KVClientTxn(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

230
api/txn.go Normal file
View File

@ -0,0 +1,230 @@
package api
import (
"bytes"
"fmt"
"io"
"net/http"
)
// Txn is used to manipulate the Txn API
type Txn struct {
c *Client
}
// Txn is used to return a handle to the K/V apis
func (c *Client) Txn() *Txn {
return &Txn{c}
}
// TxnOp is the internal format we send to Consul. Currently only K/V and
// check operations are supported.
type TxnOp struct {
KV *KVTxnOp
Node *NodeTxnOp
Service *ServiceTxnOp
Check *CheckTxnOp
}
// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp
// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KV *KVPair
Node *Node
Service *CatalogService
Check *HealthCheck
}
// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult
// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}
// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError
// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}
// KVOp constants give possible operations available in a transaction.
type KVOp string
const (
KVSet KVOp = "set"
KVDelete KVOp = "delete"
KVDeleteCAS KVOp = "delete-cas"
KVDeleteTree KVOp = "delete-tree"
KVCAS KVOp = "cas"
KVLock KVOp = "lock"
KVUnlock KVOp = "unlock"
KVGet KVOp = "get"
KVGetTree KVOp = "get-tree"
KVCheckSession KVOp = "check-session"
KVCheckIndex KVOp = "check-index"
KVCheckNotExists KVOp = "check-not-exists"
)
// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Verb KVOp
Key string
Value []byte
Flags uint64
Index uint64
Session string
}
// KVTxnOps defines a set of operations to be performed inside a single
// transaction.
type KVTxnOps []*KVTxnOp
// KVTxnResponse has the outcome of a transaction.
type KVTxnResponse struct {
Results []*KVPair
Errors TxnErrors
}
// NodeOp constants give possible operations available in a transaction.
type NodeOp string
const (
NodeGet NodeOp = "get"
NodeSet NodeOp = "set"
NodeCAS NodeOp = "cas"
NodeDelete NodeOp = "delete"
NodeDeleteCAS NodeOp = "delete-cas"
)
// NodeTxnOp defines a single operation inside a transaction.
type NodeTxnOp struct {
Verb NodeOp
Node Node
}
// ServiceOp constants give possible operations available in a transaction.
type ServiceOp string
const (
ServiceGet ServiceOp = "get"
ServiceSet ServiceOp = "set"
ServiceCAS ServiceOp = "cas"
ServiceDelete ServiceOp = "delete"
ServiceDeleteCAS ServiceOp = "delete-cas"
)
// ServiceTxnOp defines a single operation inside a transaction.
type ServiceTxnOp struct {
Verb ServiceOp
Node string
Service AgentService
}
// CheckOp constants give possible operations available in a transaction.
type CheckOp string
const (
CheckGet CheckOp = "get"
CheckSet CheckOp = "set"
CheckCAS CheckOp = "cas"
CheckDelete CheckOp = "delete"
CheckDeleteCAS CheckOp = "delete-cas"
)
// CheckTxnOp defines a single operation inside a transaction.
type CheckTxnOp struct {
Verb CheckOp
Check HealthCheck
}
// Txn is used to apply multiple Consul operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the different fields in the TxnOp structure
// to define operations. If any operation fails, none of the changes are applied
// to the state store.
//
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example:
//
// ops := KVTxnOps{
// &KVTxnOp{
// Verb: KVLock,
// Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"),
// },
// &KVTxnOp{
// Verb: KVGet,
// Key: "another/key",
// },
// &CheckTxnOp{
// Verb: CheckSet,
// HealthCheck: HealthCheck{
// Node: "foo",
// CheckID: "redis:a",
// Name: "Redis Health Check",
// Status: "passing",
// },
// }
// }
// ok, response, _, err := kv.Txn(&ops, nil)
//
// If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries
// for each operation. For KV operations, Deleted keys will have a nil entry in the
// results, and to save space, the Value of each key in the Results will be nil
// unless the operation is a KVGet. If the transaction was rolled back, the Errors
// member will have entries referencing the index of the operation that failed
// along with an error message.
func (t *Txn) Txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) {
return t.c.txn(txn, q)
}
func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) {
r := c.newRequest("PUT", "/v1/txn")
r.setQueryOptions(q)
r.obj = txn
rtt, resp, err := c.doRequest(r)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err
}
return resp.StatusCode == http.StatusOK, &txnResp, qm, nil
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
}

247
api/txn_test.go Normal file
View File

@ -0,0 +1,247 @@
package api
import (
"strings"
"testing"
"time"
"github.com/hashicorp/go-uuid"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/require"
)
func TestAPI_ClientTxn(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
txn := c.Txn()
// Set up a test service and health check.
nodeID, err := uuid.GenerateUUID()
require.NoError(err)
catalog := c.Catalog()
reg := &CatalogRegistration{
ID: nodeID,
Node: "foo",
Address: "2.2.2.2",
Service: &AgentService{
ID: "foo1",
Service: "foo",
},
Check: &AgentCheck{
CheckID: "bar",
Status: "critical",
Definition: HealthCheckDefinition{
TCP: "1.1.1.1",
Interval: 5 * time.Second,
},
},
}
_, err = catalog.Register(reg, nil)
require.NoError(err)
node, _, err := catalog.Node("foo", nil)
require.NoError(err)
require.Equal(nodeID, node.Node.ID)
// Make a session.
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
// Acquire and get the key via a transaction, but don't supply a valid
// session.
key := testKey()
value := []byte("test")
ops := TxnOps{
&TxnOp{
KV: &KVTxnOp{
Verb: KVLock,
Key: key,
Value: value,
},
},
&TxnOp{
KV: &KVTxnOp{
Verb: KVGet,
Key: key,
},
},
&TxnOp{
Node: &NodeTxnOp{
Verb: NodeGet,
Node: Node{Node: "foo"},
},
},
&TxnOp{
Service: &ServiceTxnOp{
Verb: ServiceGet,
Node: "foo",
Service: AgentService{ID: "foo1"},
},
},
&TxnOp{
Check: &CheckTxnOp{
Verb: CheckGet,
Check: HealthCheck{Node: "foo", CheckID: "bar"},
},
},
}
ok, ret, _, err := txn.Txn(ops, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if ok {
t.Fatalf("transaction should have failed")
}
if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 {
t.Fatalf("bad: %v", ret.Errors[2])
}
if ret.Errors[0].OpIndex != 0 ||
!strings.Contains(ret.Errors[0].What, "missing session") ||
!strings.Contains(ret.Errors[1].What, "doesn't exist") {
t.Fatalf("bad: %v", ret.Errors[0])
}
// Now poke in a real session and try again.
ops[0].KV.Session = id
ok, ret, _, err = txn.Txn(ops, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}
if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 5 {
t.Fatalf("bad: %v", ret)
}
expected := TxnResults{
&TxnResult{
KV: &KVPair{
Key: key,
Session: id,
LockIndex: 1,
CreateIndex: ret.Results[0].KV.CreateIndex,
ModifyIndex: ret.Results[0].KV.ModifyIndex,
},
},
&TxnResult{
KV: &KVPair{
Key: key,
Session: id,
Value: []byte("test"),
LockIndex: 1,
CreateIndex: ret.Results[1].KV.CreateIndex,
ModifyIndex: ret.Results[1].KV.ModifyIndex,
},
},
&TxnResult{
Node: &Node{
ID: nodeID,
Node: "foo",
Address: "2.2.2.2",
Datacenter: "dc1",
CreateIndex: ret.Results[2].Node.CreateIndex,
ModifyIndex: ret.Results[2].Node.CreateIndex,
},
},
&TxnResult{
Service: &CatalogService{
ID: "foo1",
CreateIndex: ret.Results[3].Service.CreateIndex,
ModifyIndex: ret.Results[3].Service.CreateIndex,
},
},
&TxnResult{
Check: &HealthCheck{
Node: "foo",
CheckID: "bar",
Status: "critical",
Definition: HealthCheckDefinition{
TCP: "1.1.1.1",
Interval: 5 * time.Second,
},
CreateIndex: ret.Results[4].Check.CreateIndex,
ModifyIndex: ret.Results[4].Check.CreateIndex,
},
},
}
verify.Values(t, "", ret.Results, expected)
// Run a read-only transaction.
ops = TxnOps{
&TxnOp{
KV: &KVTxnOp{
Verb: KVGet,
Key: key,
},
},
&TxnOp{
Node: &NodeTxnOp{
Verb: NodeGet,
Node: Node{ID: s.Config.NodeID, Node: s.Config.NodeName},
},
},
}
ok, ret, _, err = txn.Txn(ops, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}
expected = TxnResults{
&TxnResult{
KV: &KVPair{
Key: key,
Session: id,
Value: []byte("test"),
LockIndex: 1,
CreateIndex: ret.Results[0].KV.CreateIndex,
ModifyIndex: ret.Results[0].KV.ModifyIndex,
},
},
&TxnResult{
Node: &Node{
ID: s.Config.NodeID,
Node: s.Config.NodeName,
Address: "127.0.0.1",
Datacenter: "dc1",
TaggedAddresses: map[string]string{
"lan": s.Config.Bind,
"wan": s.Config.Bind,
},
Meta: map[string]string{"consul-network-segment": ""},
CreateIndex: ret.Results[1].Node.CreateIndex,
ModifyIndex: ret.Results[1].Node.ModifyIndex,
},
},
}
verify.Values(t, "", ret.Results, expected)
// Sanity check using the regular GET API.
kv := c.KV()
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != id {
t.Fatalf("Expected lock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
}

View File

@ -3,19 +3,18 @@ layout: api
page_title: Transaction - HTTP API
sidebar_current: api-txn
description: |-
The /txn endpoints manage updates or fetches of multiple keys inside a single,
atomic transaction.
The /txn endpoint manages multiple operations in Consul, including catalog updates and fetches of multiple KV entries inside a single, atomic transaction.
---
# Transactions HTTP API
The `/txn` endpoints manage updates or fetches of multiple keys inside a single,
atomic transaction. It is important to note that each datacenter has its own KV
store, and there is no built-in replication between datacenters.
The `/txn` endpoint manages multiple operations in Consul, including catalog
updates and fetches of multiple KV entries inside a single, atomic
transaction.
## Create Transaction
This endpoint permits submitting a list of operations to apply to the KV store
This endpoint permits submitting a list of operations to apply to Consul
inside of a transaction. If any operation fails, the transaction is rolled back
and none of the changes are applied.
@ -43,7 +42,7 @@ The table below shows this endpoint's support for
| Blocking Queries | Consistency Modes | Agent Caching | ACL Required |
| ---------------- | ----------------- | ------------- | ------------ |
| `NO` | `all`<sup>1</sup> | `none` | `key:read,key:write`<sup>2</sup> |
| `NO` | `all`<sup>1</sup> | `none` | `key:read,key:write`<br>`node:read,node:write`<br>`service:read,service:write`<sup>2</sup>
<sup>1</sup> For read-only transactions
<br>
@ -55,7 +54,7 @@ The table below shows this endpoint's support for
to the datacenter of the agent being queried. This is specified as part of the
URL as a query parameter.
- `KV` is the only available operation type, though other types may be added in the future.
- `KV` operations have the following fields:
- `Verb` `(string: <required>)` - Specifies the type of operation to perform.
Please see the table below for available verbs.
@ -75,6 +74,31 @@ The table below shows this endpoint's support for
- `Session` `(string: "")` - Specifies a session. See the table below for more
information.
- `Node` operations have the following fields:
- `Verb` `(string: <required>)` - Specifies the type of operation to perform.
- `Node` `(Node: <required>)` - Specifies the node information to use
for the operation. See the [catalog endpoint](/api/catalog.html#parameters) for the fields in this object. Note the only the node can be specified here, not any services or checks - separate service or check operations must be used for those.
- `Service` operations have the following fields:
- `Verb` `(string: <required>)` - Specifies the type of operation to perform.
- `Node` `(string: <required>)` = Specifies the name of the node to use for
this service operation.
- `Service` `(Service: <required>)` - Specifies the service instance information to use
for the operation. See the [catalog endpoint](/api/catalog.html#parameters) for the fields in this object.
- `Check` operations have the following fields:
- `Verb` `(string: <required>)` - Specifies the type of operation to perform.
- `Service` `(Service: <required>)` - Specifies the check to use
for the operation. See the [catalog endpoint](/api/catalog.html#parameters) for the fields in this object.
Please see the table below for available verbs.
### Sample Payload
The body of the request should be a list of operations to perform inside the
@ -91,6 +115,48 @@ atomic transaction. Up to 64 operations may be present in a single transaction.
"Index": <index>,
"Session": "<session id>"
}
},
{
"Node": {
"Verb": "set",
"Node": {
"ID": "67539c9d-b948-ba67-edd4-d07a676d6673",
"Node": "bar",
"Address": "192.168.0.1",
"Datacenter": "dc1",
"Meta": {
"instance_type": "m2.large"
}
}
}
},
{
"Service": {
"Verb": "delete",
"Node": "foo",
"Service": {
"ID": "db1"
}
}
},
{
"Check": {
"Verb": "cas",
"Check": {
"Node": "bar",
"CheckID": "service:web1",
"Name": "Web HTTP Check",
"Status": "critical",
"ServiceID": "web1",
"ServiceName": "web",
"ServiceTags": null,
"Definition": {
"HTTP": "http://localhost:8080",
"Interval": "10s"
},
"ModifyIndex": 22
}
}
}
]
```
@ -123,6 +189,39 @@ look like this:
"CreateIndex": <index>,
"ModifyIndex": <index>
}
},
{
"Node": {
"ID": "67539c9d-b948-ba67-edd4-d07a676d6673",
"Node": "bar",
"Address": "192.168.0.1",
"Datacenter": "dc1",
"TaggedAddresses": null,
"Meta": {
"instance_type": "m2.large"
},
"CreateIndex": 32,
"ModifyIndex": 32
}
},
{
"Check": {
"Node": "bar",
"CheckID": "service:web1",
"Name": "Web HTTP Check",
"Status": "critical",
"Notes": "",
"Output": "",
"ServiceID": "web1",
"ServiceName": "web",
"ServiceTags": null,
"Definition": {
"HTTP": "http://localhost:8080",
"Interval": "10s"
},
"CreateIndex": 22,
"ModifyIndex": 35
}
}
],
"Errors": [
@ -130,12 +229,13 @@ look like this:
"OpIndex": <index of failed operation>,
"What": "<error message for failed operation>"
},
...
]
}
```
- `Results` has entries for some operations if the transaction was successful.
To save space, the `Value` will be `null` for any `Verb` other than "get" or
To save space, the `Value` for KV results will be `null` for any `Verb` other than "get" or
"get-tree". Like the `/v1/kv/<key>` endpoint, `Value` will be Base64-encoded
if it is present. Also, no result entries will be added for verbs that delete
keys.
@ -145,10 +245,12 @@ look like this:
transaction, and `What` is a string with an error message about why that
operation failed.
### Table of Operations
### Tables of Operations
The following table summarizes the available verbs and the fields that apply to
that operation ("X" means a field is required and "O" means it is optional):
#### KV Operations
The following tables summarize the available verbs and the fields that apply to
those operations ("X" means a field is required and "O" means it is optional):
| Verb | Operation | Key | Value | Flags | Index | Session |
| ------------------ | -------------------------------------------- | :--: | :---: | :---: | :---: | :-----: |
@ -164,3 +266,42 @@ that operation ("X" means a field is required and "O" means it is optional):
| `delete` | Delete the key | `x` | | | | |
| `delete-tree` | Delete all keys with a prefix | `x` | | | | |
| `delete-cas` | Delete, but with CAS semantics | `x` | | | `x` | |
#### Node Operations
Node operations act on an individual node and require either a Node ID or name, giving precedence
to the ID if both are set. Delete operations will not return a result on success.
| Verb | Operation |
| ------------------ | -------------------------------------------- |
| `set` | Sets the node to the given state |
| `cas` | Sets, but with CAS semantics using the given ModifyIndex |
| `get` | Get the node, fails if it does not exist |
| `delete` | Delete the node |
| `delete-cas` | Delete, but with CAS semantics |
#### Service Operations
Service operations act on an individual service instance on the given node name. Both a node name
and valid service name are required. Delete operations will not return a result on success.
| Verb | Operation |
| ------------------ | -------------------------------------------- |
| `set` | Sets the service to the given state |
| `cas` | Sets, but with CAS semantics using the given ModifyIndex |
| `get` | Get the service, fails if it does not exist |
| `delete` | Delete the service |
| `delete-cas` | Delete, but with CAS semantics |
#### Check Operations
Check operations act on an individual health check instance on the given node name. Both a node name
and valid check ID are required. Delete operations will not return a result on success.
| Verb | Operation |
| ------------------ | -------------------------------------------- |
| `set` | Sets the health check to the given state |
| `cas` | Sets, but with CAS semantics using the given ModifyIndex |
| `get` | Get the check, fails if it does not exist |
| `delete` | Delete the check |
| `delete-cas` | Delete, but with CAS semantics |