mirror of https://github.com/status-im/consul.git
api: add support for new txn operations
This commit is contained in:
parent
de4dbf583e
commit
67bac7a815
|
@ -1361,11 +1361,6 @@ func vetNodeTxnOp(op *structs.TxnNodeOp, rule acl.Authorizer) error {
|
|||
|
||||
node := op.Node
|
||||
|
||||
// Filtering for GETs is done on the output side.
|
||||
if op.Verb == api.NodeGet {
|
||||
return nil
|
||||
}
|
||||
|
||||
n := &api.Node{
|
||||
Node: node.Node,
|
||||
ID: string(node.ID),
|
||||
|
@ -1399,11 +1394,6 @@ func vetServiceTxnOp(op *structs.TxnServiceOp, rule acl.Authorizer) error {
|
|||
|
||||
service := op.Service
|
||||
|
||||
// Filtering for GETs is done on the output side.
|
||||
if op.Verb == api.ServiceGet {
|
||||
return nil
|
||||
}
|
||||
|
||||
n := &api.Node{Node: op.Node}
|
||||
svc := &api.AgentService{
|
||||
ID: service.ID,
|
||||
|
@ -1431,11 +1421,6 @@ func vetCheckTxnOp(op *structs.TxnCheckOp, rule acl.Authorizer) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Filtering for GETs is done on the output side.
|
||||
if op.Verb == api.CheckGet {
|
||||
return nil
|
||||
}
|
||||
|
||||
n := &api.Node{Node: op.Check.Node}
|
||||
svc := &api.AgentService{
|
||||
ID: op.Check.ServiceID,
|
||||
|
|
|
@ -491,14 +491,22 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
|
|||
idx := maxIndexTxn(tx, "nodes")
|
||||
|
||||
// Retrieve the node from the state store
|
||||
node, err := tx.First("nodes", "id", id)
|
||||
node, err := getNodeTxn(tx, id)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
if node != nil {
|
||||
return idx, node.(*structs.Node), nil
|
||||
return idx, node, nil
|
||||
}
|
||||
|
||||
func getNodeTxn(tx *memdb.Txn, nodeName string) (*structs.Node, error) {
|
||||
node, err := tx.First("nodes", "id", nodeName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
return idx, nil, nil
|
||||
if node != nil {
|
||||
return node.(*structs.Node), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func getNodeIDTxn(tx *memdb.Txn, id types.NodeID) (*structs.Node, error) {
|
||||
|
|
|
@ -129,7 +129,14 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc
|
|||
|
||||
switch op.Verb {
|
||||
case api.NodeGet:
|
||||
entry, err = getNodeIDTxn(tx, op.Node.ID)
|
||||
if op.Node.ID != "" {
|
||||
entry, err = getNodeIDTxn(tx, op.Node.ID)
|
||||
} else {
|
||||
entry, err = getNodeTxn(tx, op.Node.Node)
|
||||
}
|
||||
if entry == nil && err == nil {
|
||||
err = fmt.Errorf("node %q doesn't exist", op.Node.Node)
|
||||
}
|
||||
|
||||
case api.NodeSet:
|
||||
err = s.ensureNodeTxn(tx, idx, &op.Node)
|
||||
|
@ -188,6 +195,9 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp)
|
|||
switch op.Verb {
|
||||
case api.ServiceGet:
|
||||
entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID)
|
||||
if entry == nil && err == nil {
|
||||
err = fmt.Errorf("service %q on node %q doesn't exist", op.Service.ID, op.Node)
|
||||
}
|
||||
|
||||
case api.ServiceSet:
|
||||
err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service)
|
||||
|
|
|
@ -279,27 +279,32 @@ func TestStateStore_Txn_Service(t *testing.T) {
|
|||
Service: "svc1",
|
||||
Address: "1.1.1.1",
|
||||
Port: 1111,
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Service: &structs.NodeService{
|
||||
ID: "svc5",
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 6,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Service: &structs.NodeService{
|
||||
ID: "svc5",
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Service: &structs.NodeService{
|
||||
ID: "svc2",
|
||||
Tags: []string{"modified"},
|
||||
ID: "svc2",
|
||||
Tags: []string{"modified"},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// Txn endpoint is used to perform multi-object atomic transactions.
|
||||
|
@ -37,6 +38,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
|
|||
})
|
||||
}
|
||||
case op.Node != nil:
|
||||
// Skip the pre-apply checks if this is a GET.
|
||||
if op.Node.Verb == api.NodeGet {
|
||||
break
|
||||
}
|
||||
|
||||
node := op.Node.Node
|
||||
if err := nodePreApply(node.Node, string(node.ID)); err != nil {
|
||||
errors = append(errors, &structs.TxnError{
|
||||
|
@ -54,6 +60,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
|
|||
})
|
||||
}
|
||||
case op.Service != nil:
|
||||
// Skip the pre-apply checks if this is a GET.
|
||||
if op.Service.Verb == api.ServiceGet {
|
||||
break
|
||||
}
|
||||
|
||||
service := &op.Service.Service
|
||||
if err := servicePreApply(service, nil); err != nil {
|
||||
errors = append(errors, &structs.TxnError{
|
||||
|
@ -71,6 +82,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
|
|||
})
|
||||
}
|
||||
case op.Check != nil:
|
||||
// Skip the pre-apply checks if this is a GET.
|
||||
if op.Check.Verb == api.CheckGet {
|
||||
break
|
||||
}
|
||||
|
||||
checkPreApply(&op.Check.Check)
|
||||
|
||||
// Check that the token has permissions for the given operation.
|
||||
|
@ -103,6 +119,21 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
|
|||
return nil
|
||||
}
|
||||
|
||||
str := ""
|
||||
for _, op := range args.Ops {
|
||||
switch {
|
||||
case op.KV != nil:
|
||||
str += fmt.Sprintf("%#v\n", op.KV)
|
||||
case op.Node != nil:
|
||||
str += fmt.Sprintf("%#v\n", op.Node)
|
||||
case op.Service != nil:
|
||||
str += fmt.Sprintf("%#v\n", op.Service)
|
||||
case op.Check != nil:
|
||||
str += fmt.Sprintf("%#v\n", op.Check)
|
||||
}
|
||||
}
|
||||
//return fmt.Errorf("%s", str)
|
||||
|
||||
// Apply the update.
|
||||
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
|
||||
if err != nil {
|
||||
|
|
|
@ -446,7 +446,18 @@ func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error)
|
|||
return err
|
||||
}
|
||||
}
|
||||
return mapstructure.Decode(raw, out)
|
||||
|
||||
decodeConf := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
|
||||
Result: &out,
|
||||
}
|
||||
|
||||
decoder, err := mapstructure.NewDecoder(decodeConf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return decoder.Decode(raw)
|
||||
}
|
||||
|
||||
// setTranslateAddr is used to set the address translation header. This is only
|
||||
|
|
|
@ -2,6 +2,7 @@ package structs
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
|
@ -893,14 +894,61 @@ type HealthCheck struct {
|
|||
}
|
||||
|
||||
type HealthCheckDefinition struct {
|
||||
HTTP string `json:",omitempty"`
|
||||
TLSSkipVerify bool `json:",omitempty"`
|
||||
Header map[string][]string `json:",omitempty"`
|
||||
Method string `json:",omitempty"`
|
||||
TCP string `json:",omitempty"`
|
||||
Interval api.ReadableDuration `json:",omitempty"`
|
||||
Timeout api.ReadableDuration `json:",omitempty"`
|
||||
DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"`
|
||||
HTTP string `json:",omitempty"`
|
||||
TLSSkipVerify bool `json:",omitempty"`
|
||||
Header map[string][]string `json:",omitempty"`
|
||||
Method string `json:",omitempty"`
|
||||
TCP string `json:",omitempty"`
|
||||
Interval time.Duration `json:",omitempty"`
|
||||
Timeout time.Duration `json:",omitempty"`
|
||||
DeregisterCriticalServiceAfter time.Duration `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) {
|
||||
type Alias HealthCheckDefinition
|
||||
return json.Marshal(&struct {
|
||||
Interval string
|
||||
Timeout string
|
||||
DeregisterCriticalServiceAfter string
|
||||
*Alias
|
||||
}{
|
||||
Interval: d.Interval.String(),
|
||||
Timeout: d.Timeout.String(),
|
||||
DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(),
|
||||
Alias: (*Alias)(d),
|
||||
})
|
||||
}
|
||||
|
||||
func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error {
|
||||
type Alias HealthCheckDefinition
|
||||
aux := &struct {
|
||||
Interval string
|
||||
Timeout string
|
||||
DeregisterCriticalServiceAfter string
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(d),
|
||||
}
|
||||
if err := json.Unmarshal(data, &aux); err != nil {
|
||||
return err
|
||||
}
|
||||
var err error
|
||||
if aux.Interval != "" {
|
||||
if d.Interval, err = time.ParseDuration(aux.Interval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if aux.Timeout != "" {
|
||||
if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if aux.DeregisterCriticalServiceAfter != "" {
|
||||
if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsSame checks if one HealthCheck is the same as another, without looking
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -48,9 +49,9 @@ func decodeValue(rawKV interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// fixupKVOp looks for non-nil KV operations and passes them on for
|
||||
// fixupTxnOp looks for non-nil Txn operations and passes them on for
|
||||
// value conversion.
|
||||
func fixupKVOp(rawOp interface{}) error {
|
||||
func fixupTxnOp(rawOp interface{}) error {
|
||||
rawMap, ok := rawOp.(map[string]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected raw op type: %T", rawOp)
|
||||
|
@ -67,15 +68,15 @@ func fixupKVOp(rawOp interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// fixupKVOps takes the raw decoded JSON and base64 decodes values in KV ops,
|
||||
// fixupTxnOps takes the raw decoded JSON and base64 decodes values in Txn ops,
|
||||
// replacing them with byte arrays.
|
||||
func fixupKVOps(raw interface{}) error {
|
||||
func fixupTxnOps(raw interface{}) error {
|
||||
rawSlice, ok := raw.([]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected raw type: %t", raw)
|
||||
}
|
||||
for _, rawOp := range rawSlice {
|
||||
if err := fixupKVOp(rawOp); err != nil {
|
||||
if err := fixupTxnOp(rawOp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -100,7 +101,7 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
|
|||
// decode it, we will return a 400 since we don't have enough context to
|
||||
// associate the error with a given operation.
|
||||
var ops api.TxnOps
|
||||
if err := decodeBody(req, &ops, fixupKVOps); err != nil {
|
||||
if err := decodeBody(req, &ops, fixupTxnOps); err != nil {
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
fmt.Fprintf(resp, "Failed to parse body: %v", err)
|
||||
return nil, 0, false
|
||||
|
@ -123,7 +124,8 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
|
|||
var writes int
|
||||
var netKVSize int
|
||||
for _, in := range ops {
|
||||
if in.KV != nil {
|
||||
switch {
|
||||
case in.KV != nil:
|
||||
size := len(in.KV.Value)
|
||||
if size > maxKVSize {
|
||||
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||
|
@ -152,6 +154,102 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
|
|||
},
|
||||
}
|
||||
opsRPC = append(opsRPC, out)
|
||||
|
||||
case in.Node != nil:
|
||||
if in.Node.Verb != api.NodeGet {
|
||||
writes++
|
||||
}
|
||||
|
||||
// Setup the default DC if not provided
|
||||
if in.Node.Node.Datacenter == "" {
|
||||
in.Node.Node.Datacenter = s.agent.config.Datacenter
|
||||
}
|
||||
|
||||
node := in.Node.Node
|
||||
out := &structs.TxnOp{
|
||||
Node: &structs.TxnNodeOp{
|
||||
Verb: in.Node.Verb,
|
||||
Node: structs.Node{
|
||||
ID: types.NodeID(node.ID),
|
||||
Node: node.Node,
|
||||
Address: node.Address,
|
||||
Datacenter: node.Datacenter,
|
||||
TaggedAddresses: node.TaggedAddresses,
|
||||
Meta: node.Meta,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: node.ModifyIndex,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
opsRPC = append(opsRPC, out)
|
||||
|
||||
case in.Service != nil:
|
||||
if in.Service.Verb != api.ServiceGet {
|
||||
writes++
|
||||
}
|
||||
|
||||
svc := in.Service.Service
|
||||
out := &structs.TxnOp{
|
||||
Service: &structs.TxnServiceOp{
|
||||
Verb: in.Service.Verb,
|
||||
Node: in.Service.Node,
|
||||
Service: structs.NodeService{
|
||||
ID: svc.ServiceID,
|
||||
Service: svc.ServiceName,
|
||||
Tags: svc.ServiceTags,
|
||||
Address: svc.ServiceAddress,
|
||||
Meta: svc.ServiceMeta,
|
||||
Port: svc.ServicePort,
|
||||
Weights: &structs.Weights{
|
||||
Passing: svc.ServiceWeights.Passing,
|
||||
Warning: svc.ServiceWeights.Warning,
|
||||
},
|
||||
EnableTagOverride: svc.ServiceEnableTagOverride,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: svc.ModifyIndex,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
opsRPC = append(opsRPC, out)
|
||||
|
||||
case in.Check != nil:
|
||||
if in.Check.Verb != api.CheckGet {
|
||||
writes++
|
||||
}
|
||||
|
||||
check := in.Check.Check
|
||||
out := &structs.TxnOp{
|
||||
Check: &structs.TxnCheckOp{
|
||||
Verb: in.Check.Verb,
|
||||
Check: structs.HealthCheck{
|
||||
Node: check.Node,
|
||||
CheckID: types.CheckID(check.CheckID),
|
||||
Name: check.Name,
|
||||
Status: check.Status,
|
||||
Notes: check.Notes,
|
||||
Output: check.Output,
|
||||
ServiceID: check.ServiceID,
|
||||
ServiceName: check.ServiceName,
|
||||
ServiceTags: check.ServiceTags,
|
||||
Definition: structs.HealthCheckDefinition{
|
||||
HTTP: check.Definition.HTTP,
|
||||
TLSSkipVerify: check.Definition.TLSSkipVerify,
|
||||
Header: check.Definition.Header,
|
||||
Method: check.Definition.Method,
|
||||
TCP: check.Definition.TCP,
|
||||
Interval: check.Definition.Interval,
|
||||
Timeout: check.Definition.Timeout,
|
||||
DeregisterCriticalServiceAfter: check.Definition.DeregisterCriticalServiceAfter,
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: check.ModifyIndex,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
opsRPC = append(opsRPC, out)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,6 +278,7 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
|
|||
|
||||
// Fast-path a transaction with only writes to the read-only endpoint,
|
||||
// which bypasses Raft, and allows for staleness.
|
||||
s.agent.logger.Printf("ops: %d", len(ops))
|
||||
conflict := false
|
||||
var ret interface{}
|
||||
if writes == 0 {
|
||||
|
@ -209,6 +308,7 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
|
|||
return nil, err
|
||||
}
|
||||
ret, conflict = reply, len(reply.Errors) > 0
|
||||
s.agent.logger.Printf("results: %d, errors: %d", len(reply.Results), len(reply.Errors))
|
||||
}
|
||||
|
||||
// If there was a conflict return the response object but set a special
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -36,6 +38,9 @@ type HealthCheck struct {
|
|||
ServiceTags []string
|
||||
|
||||
Definition HealthCheckDefinition
|
||||
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
||||
// HealthCheckDefinition is used to store the details about
|
||||
|
@ -46,9 +51,56 @@ type HealthCheckDefinition struct {
|
|||
Method string
|
||||
TLSSkipVerify bool
|
||||
TCP string
|
||||
Interval ReadableDuration
|
||||
Timeout ReadableDuration
|
||||
DeregisterCriticalServiceAfter ReadableDuration
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
DeregisterCriticalServiceAfter time.Duration
|
||||
}
|
||||
|
||||
func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) {
|
||||
type Alias HealthCheckDefinition
|
||||
return json.Marshal(&struct {
|
||||
Interval string
|
||||
Timeout string
|
||||
DeregisterCriticalServiceAfter string
|
||||
*Alias
|
||||
}{
|
||||
Interval: d.Interval.String(),
|
||||
Timeout: d.Timeout.String(),
|
||||
DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(),
|
||||
Alias: (*Alias)(d),
|
||||
})
|
||||
}
|
||||
|
||||
func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error {
|
||||
type Alias HealthCheckDefinition
|
||||
aux := &struct {
|
||||
Interval string
|
||||
Timeout string
|
||||
DeregisterCriticalServiceAfter string
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(d),
|
||||
}
|
||||
if err := json.Unmarshal(data, &aux); err != nil {
|
||||
return err
|
||||
}
|
||||
var err error
|
||||
if aux.Interval != "" {
|
||||
if d.Interval, err = time.ParseDuration(aux.Interval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if aux.Timeout != "" {
|
||||
if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if aux.DeregisterCriticalServiceAfter != "" {
|
||||
if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealthChecks is a collection of HealthCheck structs.
|
||||
|
|
|
@ -213,9 +213,9 @@ func TestAPI_HealthChecks(t *testing.T) {
|
|||
if meta.LastIndex == 0 {
|
||||
r.Fatalf("bad: %v", meta)
|
||||
}
|
||||
if got, want := out, checks; !verify.Values(t, "checks", got, want) {
|
||||
r.Fatal("health.Checks failed")
|
||||
}
|
||||
checks[0].CreateIndex = out[0].CreateIndex
|
||||
checks[0].ModifyIndex = out[0].ModifyIndex
|
||||
verify.Values(r, "checks", out, checks)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -265,7 +265,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
|
|||
// The Txn function has been deprecated from the KV object; please see the Txn
|
||||
// object for more information about Transactions.
|
||||
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
|
||||
ops := make(TxnOps, len(txn))
|
||||
var ops TxnOps
|
||||
for _, op := range txn {
|
||||
ops = append(ops, &TxnOp{KV: op})
|
||||
}
|
||||
|
|
|
@ -456,7 +456,7 @@ func TestAPI_ClientAcquireRelease(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAPI_ClientTxn(t *testing.T) {
|
||||
func TestAPI_KVClientTxn(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
|
37
api/txn.go
37
api/txn.go
|
@ -20,8 +20,10 @@ func (c *Client) Txn() *Txn {
|
|||
// TxnOp is the internal format we send to Consul. Currently only K/V and
|
||||
// check operations are supported.
|
||||
type TxnOp struct {
|
||||
KV *KVTxnOp
|
||||
Check *CheckTxnOp
|
||||
KV *KVTxnOp
|
||||
Node *NodeTxnOp
|
||||
Service *ServiceTxnOp
|
||||
Check *CheckTxnOp
|
||||
}
|
||||
|
||||
// TxnOps is a list of transaction operations.
|
||||
|
@ -29,8 +31,10 @@ type TxnOps []*TxnOp
|
|||
|
||||
// TxnResult is the internal format we receive from Consul.
|
||||
type TxnResult struct {
|
||||
KV *KVPair
|
||||
Check *HealthCheck
|
||||
KV *KVPair
|
||||
Node *Node
|
||||
Service *CatalogService
|
||||
Check *HealthCheck
|
||||
}
|
||||
|
||||
// TxnResults is a list of TxnResult objects.
|
||||
|
@ -100,6 +104,12 @@ const (
|
|||
NodeDeleteCAS NodeOp = "delete-cas"
|
||||
)
|
||||
|
||||
// NodeTxnOp defines a single operation inside a transaction.
|
||||
type NodeTxnOp struct {
|
||||
Verb NodeOp
|
||||
Node Node
|
||||
}
|
||||
|
||||
// ServiceOp constants give possible operations available in a transaction.
|
||||
type ServiceOp string
|
||||
|
||||
|
@ -111,6 +121,13 @@ const (
|
|||
ServiceDeleteCAS ServiceOp = "delete-cas"
|
||||
)
|
||||
|
||||
// ServiceTxnOp defines a single operation inside a transaction.
|
||||
type ServiceTxnOp struct {
|
||||
Verb ServiceOp
|
||||
Node string
|
||||
Service CatalogService
|
||||
}
|
||||
|
||||
// CheckOp constants give possible operations available in a transaction.
|
||||
type CheckOp string
|
||||
|
||||
|
@ -185,17 +202,7 @@ func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMet
|
|||
r := c.newRequest("PUT", "/v1/txn")
|
||||
r.setQueryOptions(q)
|
||||
|
||||
// Convert into the internal txn format.
|
||||
ops := make(TxnOps, 0, len(txn))
|
||||
for _, kvOp := range txn {
|
||||
switch {
|
||||
case kvOp.KV != nil:
|
||||
ops = append(ops, &TxnOp{KV: kvOp.KV})
|
||||
case kvOp.Check != nil:
|
||||
ops = append(ops, &TxnOp{Check: kvOp.Check})
|
||||
}
|
||||
}
|
||||
r.obj = ops
|
||||
r.obj = txn
|
||||
rtt, resp, err := c.doRequest(r)
|
||||
if err != nil {
|
||||
return false, nil, nil, err
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAPI_ClientTxn(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
session := c.Session()
|
||||
txn := c.Txn()
|
||||
|
||||
// Set up a test service and health check.
|
||||
nodeID, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
catalog := c.Catalog()
|
||||
reg := &CatalogRegistration{
|
||||
ID: nodeID,
|
||||
Node: "foo",
|
||||
Address: "2.2.2.2",
|
||||
Service: &AgentService{
|
||||
ID: "foo1",
|
||||
Service: "foo",
|
||||
},
|
||||
Check: &AgentCheck{
|
||||
CheckID: "bar",
|
||||
Status: "critical",
|
||||
Definition: HealthCheckDefinition{
|
||||
TCP: "1.1.1.1",
|
||||
Interval: 5 * time.Second,
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err = catalog.Register(reg, nil)
|
||||
require.NoError(err)
|
||||
|
||||
node, _, err := catalog.Node("foo", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(nodeID, node.Node.ID)
|
||||
|
||||
// Make a session.
|
||||
id, _, err := session.CreateNoChecks(nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer session.Destroy(id, nil)
|
||||
|
||||
// Acquire and get the key via a transaction, but don't supply a valid
|
||||
// session.
|
||||
key := testKey()
|
||||
value := []byte("test")
|
||||
ops := TxnOps{
|
||||
&TxnOp{
|
||||
KV: &KVTxnOp{
|
||||
Verb: KVLock,
|
||||
Key: key,
|
||||
Value: value,
|
||||
},
|
||||
},
|
||||
&TxnOp{
|
||||
KV: &KVTxnOp{
|
||||
Verb: KVGet,
|
||||
Key: key,
|
||||
},
|
||||
},
|
||||
&TxnOp{
|
||||
Node: &NodeTxnOp{
|
||||
Verb: NodeGet,
|
||||
Node: Node{Node: "foo"},
|
||||
},
|
||||
},
|
||||
&TxnOp{
|
||||
Service: &ServiceTxnOp{
|
||||
Verb: ServiceGet,
|
||||
Node: "foo",
|
||||
Service: CatalogService{ServiceID: "foo1"},
|
||||
},
|
||||
},
|
||||
&TxnOp{
|
||||
Check: &CheckTxnOp{
|
||||
Verb: CheckGet,
|
||||
Check: HealthCheck{Node: "foo", CheckID: "bar"},
|
||||
},
|
||||
},
|
||||
}
|
||||
ok, ret, _, err := txn.Txn(ops, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
} else if ok {
|
||||
t.Fatalf("transaction should have failed")
|
||||
}
|
||||
|
||||
if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 {
|
||||
t.Fatalf("bad: %v", ret.Errors[2])
|
||||
}
|
||||
if ret.Errors[0].OpIndex != 0 ||
|
||||
!strings.Contains(ret.Errors[0].What, "missing session") ||
|
||||
!strings.Contains(ret.Errors[1].What, "doesn't exist") {
|
||||
t.Fatalf("bad: %v", ret.Errors[0])
|
||||
}
|
||||
|
||||
// Now poke in a real session and try again.
|
||||
ops[0].KV.Session = id
|
||||
ok, ret, _, err = txn.Txn(ops, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
} else if !ok {
|
||||
t.Fatalf("transaction failure")
|
||||
}
|
||||
|
||||
if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 5 {
|
||||
t.Fatalf("bad: %v", ret)
|
||||
}
|
||||
expected := TxnResults{
|
||||
&TxnResult{
|
||||
KV: &KVPair{
|
||||
Key: key,
|
||||
Session: id,
|
||||
LockIndex: 1,
|
||||
CreateIndex: ret.Results[0].KV.CreateIndex,
|
||||
ModifyIndex: ret.Results[0].KV.ModifyIndex,
|
||||
},
|
||||
},
|
||||
&TxnResult{
|
||||
KV: &KVPair{
|
||||
Key: key,
|
||||
Session: id,
|
||||
Value: []byte("test"),
|
||||
LockIndex: 1,
|
||||
CreateIndex: ret.Results[1].KV.CreateIndex,
|
||||
ModifyIndex: ret.Results[1].KV.ModifyIndex,
|
||||
},
|
||||
},
|
||||
&TxnResult{
|
||||
Node: &Node{
|
||||
ID: nodeID,
|
||||
Node: "foo",
|
||||
Address: "2.2.2.2",
|
||||
Datacenter: "dc1",
|
||||
CreateIndex: ret.Results[2].Node.CreateIndex,
|
||||
ModifyIndex: ret.Results[2].Node.CreateIndex,
|
||||
},
|
||||
},
|
||||
&TxnResult{
|
||||
Service: &CatalogService{
|
||||
ID: "foo1",
|
||||
CreateIndex: ret.Results[3].Service.CreateIndex,
|
||||
ModifyIndex: ret.Results[3].Service.CreateIndex,
|
||||
},
|
||||
},
|
||||
&TxnResult{
|
||||
Check: &HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "bar",
|
||||
Status: "critical",
|
||||
Definition: HealthCheckDefinition{
|
||||
TCP: "1.1.1.1",
|
||||
Interval: 5 * time.Second,
|
||||
},
|
||||
CreateIndex: ret.Results[4].Check.CreateIndex,
|
||||
ModifyIndex: ret.Results[4].Check.CreateIndex,
|
||||
},
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", ret.Results, expected)
|
||||
|
||||
// Run a read-only transaction.
|
||||
ops = TxnOps{
|
||||
&TxnOp{
|
||||
KV: &KVTxnOp{
|
||||
Verb: KVGet,
|
||||
Key: key,
|
||||
},
|
||||
},
|
||||
&TxnOp{
|
||||
Node: &NodeTxnOp{
|
||||
Verb: NodeGet,
|
||||
Node: Node{ID: s.Config.NodeID, Node: s.Config.NodeName},
|
||||
},
|
||||
},
|
||||
}
|
||||
ok, ret, _, err = txn.Txn(ops, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
} else if !ok {
|
||||
t.Fatalf("transaction failure")
|
||||
}
|
||||
|
||||
expected = TxnResults{
|
||||
&TxnResult{
|
||||
KV: &KVPair{
|
||||
Key: key,
|
||||
Session: id,
|
||||
Value: []byte("test"),
|
||||
LockIndex: 1,
|
||||
CreateIndex: ret.Results[0].KV.CreateIndex,
|
||||
ModifyIndex: ret.Results[0].KV.ModifyIndex,
|
||||
},
|
||||
},
|
||||
&TxnResult{
|
||||
Node: &Node{
|
||||
ID: s.Config.NodeID,
|
||||
Node: s.Config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
Datacenter: "dc1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"lan": s.Config.Bind,
|
||||
"wan": s.Config.Bind,
|
||||
},
|
||||
Meta: map[string]string{"consul-network-segment": ""},
|
||||
CreateIndex: ret.Results[1].Node.CreateIndex,
|
||||
ModifyIndex: ret.Results[1].Node.ModifyIndex,
|
||||
},
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", ret.Results, expected)
|
||||
|
||||
// Sanity check using the regular GET API.
|
||||
kv := c.KV()
|
||||
pair, meta, err := kv.Get(key, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if pair == nil {
|
||||
t.Fatalf("expected value: %#v", pair)
|
||||
}
|
||||
if pair.LockIndex != 1 {
|
||||
t.Fatalf("Expected lock: %v", pair)
|
||||
}
|
||||
if pair.Session != id {
|
||||
t.Fatalf("Expected lock: %v", pair)
|
||||
}
|
||||
if meta.LastIndex == 0 {
|
||||
t.Fatalf("unexpected value: %#v", meta)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue