mirror of
https://github.com/status-im/consul.git
synced 2025-01-13 15:26:48 +00:00
Refactors TxnRequest/TxnResponse into a form that will allow non-KV ops.
This isn't needed/used yet, but it's a good hook to get in there so we can add more atomic operations in the future. The Go API hides this detail so that feels like a KV-specific API. The implications on the REST API are pretty minimal.
This commit is contained in:
parent
69f58ad04a
commit
38d0f6676f
102
api/kv.go
102
api/kv.go
@ -41,7 +41,7 @@ const (
|
||||
|
||||
// KVTxnOp defines a single operation inside a transaction.
|
||||
type KVTxnOp struct {
|
||||
Op string
|
||||
Verb string
|
||||
Key string
|
||||
Value []byte
|
||||
Flags uint64
|
||||
@ -49,23 +49,14 @@ type KVTxnOp struct {
|
||||
Session string
|
||||
}
|
||||
|
||||
// KVTxn defines a set of operations to be performed inside a single transaction.
|
||||
type KVTxn []KVTxnOp
|
||||
|
||||
// KVTxnError is used to return information about an operation in a
|
||||
// KVTxnOps defines a set of operations to be performed inside a single
|
||||
// transaction.
|
||||
type KVTxnError struct {
|
||||
OpIndex int
|
||||
What string
|
||||
}
|
||||
type KVTxnOps []*KVTxnOp
|
||||
|
||||
// KVTxnErrors is a list of KVTxnError objects.
|
||||
type KVTxnErrors []KVTxnError
|
||||
|
||||
// KVTxnResult is used to return the results of a transaction.
|
||||
type KVTxnResult struct {
|
||||
Errors KVTxnErrors
|
||||
Results KVPairs
|
||||
// KVTxnResponse has the outcome of a transaction.
|
||||
type KVTxnResponse struct {
|
||||
Results []*KVPair
|
||||
Errors TxnErrors
|
||||
}
|
||||
|
||||
// KV is used to manipulate the K/V API
|
||||
@ -284,43 +275,84 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
|
||||
return res, qm, nil
|
||||
}
|
||||
|
||||
// TxnOp is the internal format we send to Consul. It's not specific to KV,
|
||||
// though currently only KV operations are supported.
|
||||
type TxnOp struct {
|
||||
KVS *KVTxnOp
|
||||
}
|
||||
|
||||
// TxnOps is a list of transaction operations.
|
||||
type TxnOps []*TxnOp
|
||||
|
||||
// TxnResult is the internal format we receive from Consul.
|
||||
type TxnResult struct {
|
||||
KVS *struct{ DirEnt *KVPair }
|
||||
}
|
||||
|
||||
// TxnResults is a list of TxnResult objects.
|
||||
type TxnResults []*TxnResult
|
||||
|
||||
// TxnError is used to return information about an operation in a transaction.
|
||||
type TxnError struct {
|
||||
OpIndex int
|
||||
What string
|
||||
}
|
||||
|
||||
// TxnErrors is a list of TxnError objects.
|
||||
type TxnErrors []*TxnError
|
||||
|
||||
// TxnResponse is the internal format we receive from Consul.
|
||||
type TxnResponse struct {
|
||||
Results TxnResults
|
||||
Errors TxnErrors
|
||||
}
|
||||
|
||||
// Txn is used to apply multiple KV operations in a single, atomic transaction.
|
||||
// Note that Go will perform the required base64 encoding on the values
|
||||
// automatically because the type is a byte slice. Transactions are defined as a
|
||||
// list of operations to perform, using the KVOp constants and KVTxnOp structure
|
||||
// to define operations. If any operation fails, none of the changes are applied
|
||||
// to the state store.
|
||||
// to the state store. Note that this hides the internal raw transaction interface
|
||||
// and munges the input and output types into KV-specific ones for ease of use.
|
||||
// If there are more non-KV operations in the future we may break out a new
|
||||
// transaction API client, but it will be easy to keep this KV-specific variant
|
||||
// supported.
|
||||
//
|
||||
// Here's an example:
|
||||
//
|
||||
// txn := KVTxn{
|
||||
// KVTxnOp{
|
||||
// Op: KVLock,
|
||||
// ops := KVTxnOps{
|
||||
// &KVTxnOp{
|
||||
// Verb: KVLock,
|
||||
// Key: "test/lock",
|
||||
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||||
// Value: []byte("hello"),
|
||||
// },
|
||||
// KVTxnOp{
|
||||
// Op: KVGet,
|
||||
// &KVTxnOp{
|
||||
// Verb: KVGet,
|
||||
// Key: "another/key",
|
||||
// },
|
||||
// }
|
||||
// ok, result, _, err := kv.Txn(&txn, nil)
|
||||
// ok, response, _, err := kv.Txn(&ops, nil)
|
||||
//
|
||||
// If there is a problem making the transaction request then an error will be
|
||||
// returned. Otherwise, the ok value will be true if the transaction succeeded
|
||||
// or false if it was rolled back. The result is a structured return value which
|
||||
// or false if it was rolled back. The response is a structured return value which
|
||||
// will have the outcome of the transaction. Its Results member will have entries
|
||||
// for each operation. Deleted keys will have a nil entry in the, and to save
|
||||
// space, the Value of each key in the Results will be nil unless the operation
|
||||
// is a KVGet. If the transaction was rolled back, the Errors member will have
|
||||
// entries referencing the index of the operation that failed along with an error
|
||||
// message.
|
||||
func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, error) {
|
||||
func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) {
|
||||
r := k.c.newRequest("PUT", "/v1/txn")
|
||||
r.setWriteOptions(q)
|
||||
|
||||
r.obj = txn
|
||||
// Convert into the internal format since this is an all-KV txn.
|
||||
ops := make(TxnOps, 0, len(txn))
|
||||
for _, kvsOp := range txn {
|
||||
ops = append(ops, &TxnOp{KVS: kvsOp})
|
||||
}
|
||||
r.obj = ops
|
||||
rtt, resp, err := k.c.doRequest(r)
|
||||
if err != nil {
|
||||
return false, nil, nil, err
|
||||
@ -331,11 +363,23 @@ func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, e
|
||||
wm.RequestTime = rtt
|
||||
|
||||
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
|
||||
var result KVTxnResult
|
||||
if err := decodeBody(resp, &result); err != nil {
|
||||
var txnResp TxnResponse
|
||||
if err := decodeBody(resp, &txnResp); err != nil {
|
||||
return false, nil, nil, err
|
||||
}
|
||||
return resp.StatusCode == http.StatusOK, &result, wm, nil
|
||||
|
||||
// Convert from the internal format.
|
||||
kvResp := KVTxnResponse{
|
||||
Errors: txnResp.Errors,
|
||||
}
|
||||
for _, result := range txnResp.Results {
|
||||
var entry *KVPair
|
||||
if result.KVS != nil {
|
||||
entry = result.KVS.DirEnt
|
||||
}
|
||||
kvResp.Results = append(kvResp.Results, entry)
|
||||
}
|
||||
return resp.StatusCode == http.StatusOK, &kvResp, wm, nil
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
@ -466,18 +466,18 @@ func TestClient_Txn(t *testing.T) {
|
||||
// session.
|
||||
key := testKey()
|
||||
value := []byte("test")
|
||||
txn := KVTxn{
|
||||
KVTxnOp{
|
||||
Op: KVLock,
|
||||
txn := KVTxnOps{
|
||||
&KVTxnOp{
|
||||
Verb: KVLock,
|
||||
Key: key,
|
||||
Value: value,
|
||||
},
|
||||
KVTxnOp{
|
||||
Op: KVGet,
|
||||
&KVTxnOp{
|
||||
Verb: KVGet,
|
||||
Key: key,
|
||||
},
|
||||
}
|
||||
ok, ret, _, err := kv.Txn(&txn, nil)
|
||||
ok, ret, _, err := kv.Txn(txn, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
} else if ok {
|
||||
@ -494,7 +494,7 @@ func TestClient_Txn(t *testing.T) {
|
||||
|
||||
// Now poke in a real session and try again.
|
||||
txn[0].Session = id
|
||||
ok, ret, _, err = kv.Txn(&txn, nil)
|
||||
ok, ret, _, err = kv.Txn(txn, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
} else if !ok {
|
||||
|
@ -10,14 +10,14 @@ import (
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
// fixupValues takes the raw decoded JSON and base64 decodes all the values,
|
||||
// fixupKVSOps takes the raw decoded JSON and base64 decodes all the KVS values,
|
||||
// replacing them with byte arrays with the data.
|
||||
func fixupValues(raw interface{}) error {
|
||||
func fixupKVSOps(raw interface{}) error {
|
||||
// decodeValue decodes the value member of the given operation.
|
||||
decodeValue := func(rawOp interface{}) error {
|
||||
rawMap, ok := rawOp.(map[string]interface{})
|
||||
decodeValue := func(rawKVS interface{}) error {
|
||||
rawMap, ok := rawKVS.(map[string]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected raw op type: %T", rawOp)
|
||||
return fmt.Errorf("unexpected raw KVS type: %T", rawKVS)
|
||||
}
|
||||
for k, v := range rawMap {
|
||||
switch strings.ToLower(k) {
|
||||
@ -41,7 +41,25 @@ func fixupValues(raw interface{}) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// fixupKVSOp looks for non-nil KVS operations and passes them on for
|
||||
// value conversion.
|
||||
fixupKVSOp := func(rawOp interface{}) error {
|
||||
rawMap, ok := rawOp.(map[string]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected raw op type: %T", rawOp)
|
||||
}
|
||||
for k, v := range rawMap {
|
||||
switch strings.ToLower(k) {
|
||||
case "kvs":
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return decodeValue(v)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -50,11 +68,10 @@ func fixupValues(raw interface{}) error {
|
||||
return fmt.Errorf("unexpected raw type: %t", raw)
|
||||
}
|
||||
for _, rawOp := range rawSlice {
|
||||
if err := decodeValue(rawOp); err != nil {
|
||||
if err := fixupKVSOp(rawOp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -66,44 +83,53 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var args structs.KVSAtomicRequest
|
||||
var args structs.TxnRequest
|
||||
s.parseDC(req, &args.Datacenter)
|
||||
s.parseToken(req, &args.Token)
|
||||
|
||||
// Note the body is in API format, and not the RPC format. If we can't
|
||||
// decode it, we will return a 400 since we don't have enough context to
|
||||
// associate the error with a given operation.
|
||||
var txn api.KVTxn
|
||||
if err := decodeBody(req, &txn, fixupValues); err != nil {
|
||||
var ops api.TxnOps
|
||||
if err := decodeBody(req, &ops, fixupKVSOps); err != nil {
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Convert the API format into the RPC format. Note that fixupValues
|
||||
// Convert the KVS API format into the RPC format. Note that fixupKVSOps
|
||||
// above will have already converted the base64 encoded strings into
|
||||
// byte arrays so we can assign right over.
|
||||
for _, in := range txn {
|
||||
// TODO @slackpad - Verify the size here, or move that down into
|
||||
// the endpoint.
|
||||
out := &structs.KVSAtomicOp{
|
||||
Op: structs.KVSOp(in.Op),
|
||||
for _, in := range ops {
|
||||
if in.KVS != nil {
|
||||
if size := len(in.KVS.Value); size > maxKVSize {
|
||||
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||
resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)",
|
||||
in.KVS.Key, size, maxKVSize)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
out := &structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSOp(in.KVS.Verb),
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: in.Key,
|
||||
Value: in.Value,
|
||||
Flags: in.Flags,
|
||||
Session: in.Session,
|
||||
Key: in.KVS.Key,
|
||||
Value: in.KVS.Value,
|
||||
Flags: in.KVS.Flags,
|
||||
Session: in.KVS.Session,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: in.Index,
|
||||
ModifyIndex: in.KVS.Index,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
args.Ops = append(args.Ops, out)
|
||||
}
|
||||
}
|
||||
|
||||
// Make the request and return a conflict status if there were errors
|
||||
// reported from the transaction.
|
||||
var reply structs.KVSAtomicResponse
|
||||
var reply structs.TxnResponse
|
||||
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
@ -50,6 +51,34 @@ func TestTxnEndpoint_Bad_Method(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestTxnEndpoint_Bad_Size(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
|
||||
[
|
||||
{
|
||||
"KVS": {
|
||||
"Verb": "set",
|
||||
"Key": "key",
|
||||
"Value": %q
|
||||
}
|
||||
}
|
||||
]
|
||||
`, strings.Repeat("bad", 2*maxKVSize))))
|
||||
req, err := http.NewRequest("PUT", "/v1/txn", buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
if _, err := srv.Txn(resp, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 413 {
|
||||
t.Fatalf("expected 413, got %d", resp.Code)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
// Make sure all incoming fields get converted properly to the internal
|
||||
@ -60,16 +89,20 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
|
||||
[
|
||||
{
|
||||
"Op": "lock",
|
||||
"KVS": {
|
||||
"Verb": "lock",
|
||||
"Key": "key",
|
||||
"Value": "aGVsbG8gd29ybGQ=",
|
||||
"Flags": 23,
|
||||
"Session": %q
|
||||
}
|
||||
},
|
||||
{
|
||||
"Op": "get",
|
||||
"KVS": {
|
||||
"Verb": "get",
|
||||
"Key": "key"
|
||||
}
|
||||
}
|
||||
]
|
||||
`, id)))
|
||||
req, err := http.NewRequest("PUT", "/v1/txn", buf)
|
||||
@ -86,17 +119,19 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
t.Fatalf("expected 200, got %d", resp.Code)
|
||||
}
|
||||
|
||||
atomic, ok := obj.(structs.KVSAtomicResponse)
|
||||
txnResp, ok := obj.(structs.TxnResponse)
|
||||
if !ok {
|
||||
t.Fatalf("bad type: %T", obj)
|
||||
}
|
||||
if len(atomic.Results) != 2 {
|
||||
t.Fatalf("bad: %v", atomic)
|
||||
if len(txnResp.Results) != 2 {
|
||||
t.Fatalf("bad: %v", txnResp)
|
||||
}
|
||||
index = atomic.Results[0].ModifyIndex
|
||||
expected := structs.KVSAtomicResponse{
|
||||
Results: structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
index = txnResp.Results[0].KVS.DirEnt.ModifyIndex
|
||||
expected := structs.TxnResponse{
|
||||
Results: structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "key",
|
||||
Value: nil,
|
||||
Flags: 23,
|
||||
@ -107,7 +142,11 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
ModifyIndex: index,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "key",
|
||||
Value: []byte("hello world"),
|
||||
Flags: 23,
|
||||
@ -119,9 +158,11 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(atomic, expected) {
|
||||
t.Fatalf("bad: %v", atomic)
|
||||
if !reflect.DeepEqual(txnResp, expected) {
|
||||
t.Fatalf("bad: %v", txnResp)
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,15 +172,19 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
|
||||
[
|
||||
{
|
||||
"Op": "cas",
|
||||
"KVS": {
|
||||
"Verb": "cas",
|
||||
"Key": "key",
|
||||
"Value": "Z29vZGJ5ZSB3b3JsZA==",
|
||||
"Index": %d
|
||||
}
|
||||
},
|
||||
{
|
||||
"Op": "get",
|
||||
"KVS": {
|
||||
"Verb": "get",
|
||||
"Key": "key"
|
||||
}
|
||||
}
|
||||
]
|
||||
`, index)))
|
||||
req, err := http.NewRequest("PUT", "/v1/txn", buf)
|
||||
@ -156,17 +201,19 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
t.Fatalf("expected 200, got %d", resp.Code)
|
||||
}
|
||||
|
||||
atomic, ok := obj.(structs.KVSAtomicResponse)
|
||||
txnResp, ok := obj.(structs.TxnResponse)
|
||||
if !ok {
|
||||
t.Fatalf("bad type: %T", obj)
|
||||
}
|
||||
if len(atomic.Results) != 2 {
|
||||
t.Fatalf("bad: %v", atomic)
|
||||
if len(txnResp.Results) != 2 {
|
||||
t.Fatalf("bad: %v", txnResp)
|
||||
}
|
||||
modIndex := atomic.Results[0].ModifyIndex
|
||||
expected := structs.KVSAtomicResponse{
|
||||
Results: structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
modIndex := txnResp.Results[0].KVS.DirEnt.ModifyIndex
|
||||
expected := structs.TxnResponse{
|
||||
Results: structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "key",
|
||||
Value: nil,
|
||||
Session: id,
|
||||
@ -175,7 +222,11 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
ModifyIndex: modIndex,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "key",
|
||||
Value: []byte("goodbye world"),
|
||||
Session: id,
|
||||
@ -185,9 +236,11 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(atomic, expected) {
|
||||
t.Fatalf("bad: %v", atomic)
|
||||
if !reflect.DeepEqual(txnResp, expected) {
|
||||
t.Fatalf("bad: %v", txnResp)
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -197,15 +250,19 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
|
||||
buf := bytes.NewBuffer([]byte(`
|
||||
[
|
||||
{
|
||||
"Op": "lock",
|
||||
"KVS": {
|
||||
"Verb": "lock",
|
||||
"Key": "key",
|
||||
"Value": "aGVsbG8gd29ybGQ=",
|
||||
"Session": "nope"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Op": "get",
|
||||
"KVS": {
|
||||
"Verb": "get",
|
||||
"Key": "key"
|
||||
}
|
||||
}
|
||||
]
|
||||
`))
|
||||
req, err := http.NewRequest("PUT", "/v1/txn", buf)
|
||||
|
@ -83,8 +83,6 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
||||
return c.applyDeregister(buf[1:], log.Index)
|
||||
case structs.KVSRequestType:
|
||||
return c.applyKVSOperation(buf[1:], log.Index)
|
||||
case structs.KVSAtomicRequestType:
|
||||
return c.applyKVSAtomicOperation(buf[1:], log.Index)
|
||||
case structs.SessionRequestType:
|
||||
return c.applySessionOperation(buf[1:], log.Index)
|
||||
case structs.ACLRequestType:
|
||||
@ -95,6 +93,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
||||
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
|
||||
case structs.PreparedQueryRequestType:
|
||||
return c.applyPreparedQueryOperation(buf[1:], log.Index)
|
||||
case structs.TxnRequestType:
|
||||
return c.applyTxn(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
@ -195,16 +195,6 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyKVSAtomicOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.KVSAtomicRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs-atomic"}, time.Now())
|
||||
entries, errors := c.state.KVSAtomicUpdate(index, req.Ops)
|
||||
return structs.KVSAtomicResponse{errors, entries}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.SessionRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
@ -298,6 +288,16 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
|
||||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
|
||||
var req structs.TxnRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
|
||||
results, errors := c.state.TxnRun(index, req.Ops)
|
||||
return structs.TxnResponse{results, errors}
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
|
@ -1241,6 +1241,47 @@ func TestFSM_TombstoneReap(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_Txn(t *testing.T) {
|
||||
fsm, err := NewFSM(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Set a key using a transaction.
|
||||
req := structs.TxnRequest{
|
||||
Datacenter: "dc1",
|
||||
Ops: structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.TxnRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if _, ok := resp.(structs.TxnResponse); !ok {
|
||||
t.Fatalf("bad response type: %T", resp)
|
||||
}
|
||||
|
||||
// Verify key is set directly in the state store.
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("missing")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_IgnoreUnknown(t *testing.T) {
|
||||
fsm, err := NewFSM(nil, os.Stderr)
|
||||
if err != nil {
|
||||
|
@ -31,9 +31,9 @@ func kvsPreApply(srv *Server, acl acl.ACL, op structs.KVSOp, dirEnt *structs.Dir
|
||||
return false, permissionDeniedErr
|
||||
}
|
||||
|
||||
case structs.KVSAtomicGet,
|
||||
structs.KVSAtomicCheckSession,
|
||||
structs.KVSAtomicCheckIndex:
|
||||
case structs.KVSGet,
|
||||
structs.KVSCheckSession,
|
||||
structs.KVSCheckIndex:
|
||||
if !acl.KeyRead(dirEnt.Key) {
|
||||
return false, permissionDeniedErr
|
||||
}
|
||||
|
@ -579,103 +579,6 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// KVSAtomicUpdate performs a series of updates atomically, all inside a single
|
||||
// transaction that only succeeds if all the operations succeed.
|
||||
func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.KVSAtomicErrors) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Dispatch all of the operations inside the transaction.
|
||||
entries := make(structs.DirEntries, 0, len(ops))
|
||||
errors := make(structs.KVSAtomicErrors, 0, len(ops))
|
||||
for i, op := range ops {
|
||||
var entry *structs.DirEntry
|
||||
var err error
|
||||
|
||||
switch op.Op {
|
||||
case structs.KVSSet:
|
||||
entry = &op.DirEnt
|
||||
err = s.kvsSetTxn(tx, idx, entry, false)
|
||||
|
||||
case structs.KVSDelete:
|
||||
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
|
||||
|
||||
case structs.KVSDeleteCAS:
|
||||
var ok bool
|
||||
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSDeleteTree:
|
||||
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
|
||||
|
||||
case structs.KVSCAS:
|
||||
var ok bool
|
||||
entry = &op.DirEnt
|
||||
ok, err = s.kvsSetCASTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSLock:
|
||||
var ok bool
|
||||
entry = &op.DirEnt
|
||||
ok, err = s.kvsLockTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSUnlock:
|
||||
var ok bool
|
||||
entry = &op.DirEnt
|
||||
ok, err = s.kvsUnlockTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSAtomicGet:
|
||||
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
|
||||
|
||||
case structs.KVSAtomicCheckSession:
|
||||
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
|
||||
|
||||
case structs.KVSAtomicCheckIndex:
|
||||
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown operation %q", op.Op)
|
||||
}
|
||||
|
||||
// Accumulate the entries. For a GET we keep the value, otherwise
|
||||
// we clone and blank out the value (we have to clone so we don't
|
||||
// modify the entry being used by the state store).
|
||||
if entry != nil {
|
||||
if op.Op == structs.KVSAtomicGet {
|
||||
entries = append(entries, entry)
|
||||
} else {
|
||||
clone := entry.Clone()
|
||||
clone.Value = nil
|
||||
entries = append(entries, clone)
|
||||
}
|
||||
} else {
|
||||
entries = append(entries, nil)
|
||||
}
|
||||
|
||||
// Capture any error along with the index of the operation that
|
||||
// failed.
|
||||
if err != nil {
|
||||
errors = append(errors, &structs.KVSAtomicError{i, err.Error()})
|
||||
}
|
||||
}
|
||||
if len(errors) > 0 {
|
||||
return nil, errors
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// kvsCheckSessionTxn checks to see if the given session matches the current
|
||||
// entry for a key.
|
||||
func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
|
||||
|
@ -1214,453 +1214,6 @@ func TestStateStore_KVSUnlock(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVS_Atomic(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create kvs entries in the state store.
|
||||
testSetKey(t, s, 1, "foo/delete", "bar")
|
||||
testSetKey(t, s, 2, "foo/bar/baz", "baz")
|
||||
testSetKey(t, s, 3, "foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 4, "foo/zorp", "zorp")
|
||||
testSetKey(t, s, 5, "foo/update", "stale")
|
||||
|
||||
// Make a real session.
|
||||
testRegisterNode(t, s, 6, "node1")
|
||||
session := testUUID()
|
||||
if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Set up a transaction that hits every operation.
|
||||
ops := structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/new",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSDelete,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/zorp",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSDeleteCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/delete",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSDeleteTree,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/bar",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "not/there",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSUnlock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: "",
|
||||
},
|
||||
},
|
||||
}
|
||||
entries, errors := s.KVSAtomicUpdate(8, ops)
|
||||
if len(errors) > 0 {
|
||||
t.Fatalf("err: %v", errors)
|
||||
}
|
||||
if len(entries) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(entries), len(ops))
|
||||
}
|
||||
|
||||
// Make sure the response looks as expected.
|
||||
expected := structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "foo/new",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
nil, // delete
|
||||
nil, // delete tree
|
||||
nil, // delete CAS
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("stale"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
nil, // get on not/there
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
nil, // get on foo/lock before it's created
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(entries) != len(expected) {
|
||||
t.Fatalf("bad: %v", entries)
|
||||
}
|
||||
for i, _ := range entries {
|
||||
if !reflect.DeepEqual(entries[i], expected[i]) {
|
||||
t.Fatalf("bad %d: %v != %v", i, *(entries[i]), *(expected[i]))
|
||||
}
|
||||
}
|
||||
|
||||
// Pull the resulting state store contents.
|
||||
idx, actual, err := s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 8 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
expected = structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/new",
|
||||
Value: []byte("one"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(actual) != len(expected) {
|
||||
t.Fatalf("bad len: %d != %d", len(actual), len(expected))
|
||||
}
|
||||
for i, _ := range actual {
|
||||
if !reflect.DeepEqual(actual[i], expected[i]) {
|
||||
t.Fatalf("bad %d: %v != %v", i, *(actual[i]), *(expected[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVS_Atomic_Rollback(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create kvs entries in the state store.
|
||||
testSetKey(t, s, 1, "foo/delete", "bar")
|
||||
testSetKey(t, s, 2, "foo/update", "stale")
|
||||
|
||||
testRegisterNode(t, s, 3, "node1")
|
||||
session := testUUID()
|
||||
if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session})
|
||||
if !ok || err != nil {
|
||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||
}
|
||||
|
||||
bogus := testUUID()
|
||||
if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// This function verifies that the state store wasn't changed.
|
||||
verifyStateStore := func(desc string) {
|
||||
idx, actual, err := s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err (%s): %s", desc, err)
|
||||
}
|
||||
if idx != 5 {
|
||||
t.Fatalf("bad index (%s): %d", desc, idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
expected := structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "foo/delete",
|
||||
Value: []byte("bar"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Value: []byte("foo"),
|
||||
LockIndex: 1,
|
||||
Session: session,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("stale"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(actual) != len(expected) {
|
||||
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(expected))
|
||||
}
|
||||
for i, _ := range actual {
|
||||
if !reflect.DeepEqual(actual[i], expected[i]) {
|
||||
t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(expected[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
verifyStateStore("initial")
|
||||
|
||||
// Set up a transaction that fails every operation.
|
||||
ops := structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSUnlock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: "nope",
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/delete",
|
||||
},
|
||||
},
|
||||
}
|
||||
entries, errors := s.KVSAtomicUpdate(7, ops)
|
||||
if len(errors) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
|
||||
}
|
||||
if len(entries) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(entries))
|
||||
}
|
||||
verifyStateStore("after")
|
||||
|
||||
// Make sure the errors look reasonable.
|
||||
expected := []string{
|
||||
"index is stale",
|
||||
"lock is already held",
|
||||
"lock isn't held, or is held by another session",
|
||||
"current session",
|
||||
`key "nope" doesn't exist`,
|
||||
"current modify index",
|
||||
`key "nope" doesn't exist`,
|
||||
"unknown operation",
|
||||
}
|
||||
if len(errors) != len(expected) {
|
||||
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
|
||||
}
|
||||
for i, msg := range expected {
|
||||
if errors[i].OpIndex != i {
|
||||
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
|
||||
}
|
||||
if !strings.Contains(errors[i].Error(), msg) {
|
||||
t.Fatalf("bad %d: %v", i, errors[i].Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
@ -1893,73 +1446,6 @@ func TestStateStore_KVS_Watches(t *testing.T) {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Verify that a basic transaction triggers multiple watches. We call
|
||||
// the same underlying methods that are called above so this is more
|
||||
// of a sanity check.
|
||||
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
|
||||
ops := structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/one",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/two",
|
||||
Value: []byte("two"),
|
||||
},
|
||||
},
|
||||
}
|
||||
entries, errors := s.KVSAtomicUpdate(15, ops)
|
||||
if len(entries) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(entries), len(ops))
|
||||
}
|
||||
if len(errors) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(errors))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Verify that a rolled back transaction doesn't trigger any watches.
|
||||
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
|
||||
ops := structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/one",
|
||||
Value: []byte("one-updated"),
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/two",
|
||||
Value: []byte("two-updated"),
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/nope",
|
||||
Value: []byte("nope"),
|
||||
},
|
||||
},
|
||||
}
|
||||
entries, errors := s.KVSAtomicUpdate(16, ops)
|
||||
if len(errors) != 1 {
|
||||
t.Fatalf("bad len: %d != 1", len(errors))
|
||||
}
|
||||
if len(entries) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(entries))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
|
||||
|
123
consul/state/txn.go
Normal file
123
consul/state/txn.go
Normal file
@ -0,0 +1,123 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVSOp) (*structs.TxnKVSResult, error) {
|
||||
var entry *structs.DirEntry
|
||||
var err error
|
||||
|
||||
switch op.Verb {
|
||||
case structs.KVSSet:
|
||||
entry = &op.DirEnt
|
||||
err = s.kvsSetTxn(tx, idx, entry, false)
|
||||
|
||||
case structs.KVSDelete:
|
||||
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
|
||||
|
||||
case structs.KVSDeleteCAS:
|
||||
var ok bool
|
||||
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSDeleteTree:
|
||||
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
|
||||
|
||||
case structs.KVSCAS:
|
||||
var ok bool
|
||||
entry = &op.DirEnt
|
||||
ok, err = s.kvsSetCASTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSLock:
|
||||
var ok bool
|
||||
entry = &op.DirEnt
|
||||
ok, err = s.kvsLockTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSUnlock:
|
||||
var ok bool
|
||||
entry = &op.DirEnt
|
||||
ok, err = s.kvsUnlockTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
|
||||
}
|
||||
|
||||
case structs.KVSGet:
|
||||
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
|
||||
|
||||
case structs.KVSCheckSession:
|
||||
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
|
||||
|
||||
case structs.KVSCheckIndex:
|
||||
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown KVS verb %q", op.Verb)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For a GET we keep the value, otherwise we clone and blank out the
|
||||
// value (we have to clone so we don't modify the entry being used by
|
||||
// the state store).
|
||||
if entry != nil {
|
||||
if op.Verb == structs.KVSGet {
|
||||
return &structs.TxnKVSResult{entry}, nil
|
||||
}
|
||||
|
||||
clone := entry.Clone()
|
||||
clone.Value = nil
|
||||
return &structs.TxnKVSResult{clone}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TxnRun tries to run the given operations all inside a single transaction. If
|
||||
// any of the operations fail, the entire transaction will be rolled back.
|
||||
func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Dispatch all of the operations inside the transaction.
|
||||
results := make(structs.TxnResults, 0, len(ops))
|
||||
errors := make(structs.TxnErrors, 0, len(ops))
|
||||
for i, op := range ops {
|
||||
var result structs.TxnResult
|
||||
var err error
|
||||
|
||||
// Dispatch based on the type of operation.
|
||||
if op.KVS != nil {
|
||||
result.KVS, err = s.txnKVS(tx, idx, op.KVS)
|
||||
} else {
|
||||
err = fmt.Errorf("no operation specified")
|
||||
}
|
||||
|
||||
// Accumulate the results.
|
||||
results = append(results, &result)
|
||||
|
||||
// Capture any error along with the index of the operation that
|
||||
// failed.
|
||||
if err != nil {
|
||||
errors = append(errors, &structs.TxnError{i, err.Error()})
|
||||
}
|
||||
}
|
||||
if len(errors) > 0 {
|
||||
return nil, errors
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return results, nil
|
||||
}
|
624
consul/state/txn_test.go
Normal file
624
consul/state/txn_test.go
Normal file
@ -0,0 +1,624 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
func TestStateStore_Txn_KVS(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create kvs results in the state store.
|
||||
testSetKey(t, s, 1, "foo/delete", "bar")
|
||||
testSetKey(t, s, 2, "foo/bar/baz", "baz")
|
||||
testSetKey(t, s, 3, "foo/bar/zip", "zip")
|
||||
testSetKey(t, s, 4, "foo/zorp", "zorp")
|
||||
testSetKey(t, s, 5, "foo/update", "stale")
|
||||
|
||||
// Make a real session.
|
||||
testRegisterNode(t, s, 6, "node1")
|
||||
session := testUUID()
|
||||
if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Set up a transaction that hits every operation.
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/new",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSDelete,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/zorp",
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSDeleteCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/delete",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSDeleteTree,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "not/there",
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSUnlock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRun(8, ops)
|
||||
if len(errors) > 0 {
|
||||
t.Fatalf("err: %v", errors)
|
||||
}
|
||||
if len(results) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(results), len(ops))
|
||||
}
|
||||
|
||||
// Make sure the response looks as expected.
|
||||
expected := structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/new",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{}, // delete
|
||||
&structs.TxnResult{}, // delete tree
|
||||
&structs.TxnResult{}, // delete CAS
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("stale"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{}, // get on not/there
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{}, // get on foo/lock before it's created
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: session,
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(results) != len(expected) {
|
||||
t.Fatalf("bad: %v", results)
|
||||
}
|
||||
for i, _ := range results {
|
||||
if !reflect.DeepEqual(results[i], expected[i]) {
|
||||
t.Fatalf("bad %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Pull the resulting state store contents.
|
||||
idx, actual, err := s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 8 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
entries := structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
LockIndex: 1,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/new",
|
||||
Value: []byte("one"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(actual) != len(entries) {
|
||||
t.Fatalf("bad len: %d != %d", len(actual), len(entries))
|
||||
}
|
||||
for i, _ := range actual {
|
||||
if !reflect.DeepEqual(actual[i], entries[i]) {
|
||||
t.Fatalf("bad %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create kvs results in the state store.
|
||||
testSetKey(t, s, 1, "foo/delete", "bar")
|
||||
testSetKey(t, s, 2, "foo/update", "stale")
|
||||
|
||||
testRegisterNode(t, s, 3, "node1")
|
||||
session := testUUID()
|
||||
if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session})
|
||||
if !ok || err != nil {
|
||||
t.Fatalf("didn't get the lock: %v %s", ok, err)
|
||||
}
|
||||
|
||||
bogus := testUUID()
|
||||
if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// This function verifies that the state store wasn't changed.
|
||||
verifyStateStore := func(desc string) {
|
||||
idx, actual, err := s.KVSList("")
|
||||
if err != nil {
|
||||
t.Fatalf("err (%s): %s", desc, err)
|
||||
}
|
||||
if idx != 5 {
|
||||
t.Fatalf("bad index (%s): %d", desc, idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
entries := structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
Key: "foo/delete",
|
||||
Value: []byte("bar"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Value: []byte("foo"),
|
||||
LockIndex: 1,
|
||||
Session: session,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
ModifyIndex: 5,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("stale"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(actual) != len(entries) {
|
||||
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(entries))
|
||||
}
|
||||
for i, _ := range actual {
|
||||
if !reflect.DeepEqual(actual[i], entries[i]) {
|
||||
t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(entries[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
verifyStateStore("initial")
|
||||
|
||||
// Set up a transaction that fails every operation.
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/update",
|
||||
Value: []byte("new"),
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSUnlock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
Session: bogus,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/lock",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: "nope",
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo/delete",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRun(7, ops)
|
||||
if len(errors) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
|
||||
}
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(results))
|
||||
}
|
||||
verifyStateStore("after")
|
||||
|
||||
// Make sure the errors look reasonable.
|
||||
expected := []string{
|
||||
"index is stale",
|
||||
"lock is already held",
|
||||
"lock isn't held, or is held by another session",
|
||||
"current session",
|
||||
`key "nope" doesn't exist`,
|
||||
"current modify index",
|
||||
`key "nope" doesn't exist`,
|
||||
"unknown KVS verb",
|
||||
}
|
||||
if len(errors) != len(expected) {
|
||||
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
|
||||
}
|
||||
for i, msg := range expected {
|
||||
if errors[i].OpIndex != i {
|
||||
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
|
||||
}
|
||||
if !strings.Contains(errors[i].Error(), msg) {
|
||||
t.Fatalf("bad %d: %v", i, errors[i].Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Verify that a basic transaction triggers multiple watches. We call
|
||||
// the same underlying methods that are called above so this is more
|
||||
// of a sanity check.
|
||||
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
|
||||
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/one",
|
||||
Value: []byte("one"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/two",
|
||||
Value: []byte("two"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRun(15, ops)
|
||||
if len(results) != len(ops) {
|
||||
t.Fatalf("bad len: %d != %d", len(results), len(ops))
|
||||
}
|
||||
if len(errors) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(errors))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Verify that a rolled back transaction doesn't trigger any watches.
|
||||
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
|
||||
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/one",
|
||||
Value: []byte("one-updated"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/two",
|
||||
Value: []byte("two-updated"),
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "multi/nope",
|
||||
Value: []byte("nope"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRun(16, ops)
|
||||
if len(errors) != 1 {
|
||||
t.Fatalf("bad len: %d != 1", len(errors))
|
||||
}
|
||||
if len(results) != 0 {
|
||||
t.Fatalf("bad len: %d != 0", len(results))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
@ -36,7 +36,7 @@ const (
|
||||
TombstoneRequestType
|
||||
CoordinateBatchUpdateType
|
||||
PreparedQueryRequestType
|
||||
KVSAtomicRequestType
|
||||
TxnRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
@ -535,11 +535,11 @@ const (
|
||||
KVSLock = "lock" // Lock a key
|
||||
KVSUnlock = "unlock" // Unlock a key
|
||||
|
||||
// KVSAtomic* operations are only available in KVSAtomicRequest
|
||||
// transactions.
|
||||
KVSAtomicGet = "get" // Read the key during the transaction.
|
||||
KVSAtomicCheckSession = "check-session" // Check the session holds the key.
|
||||
KVSAtomicCheckIndex = "check-index" // Check the modify index of the key.
|
||||
// The following operations are only available inside of atomic
|
||||
// transactions via the Txn request.
|
||||
KVSGet = "get" // Read the key during the transaction.
|
||||
KVSCheckSession = "check-session" // Check the session holds the key.
|
||||
KVSCheckIndex = "check-index" // Check the modify index of the key.
|
||||
)
|
||||
|
||||
// KVSRequest is used to operate on the Key-Value store
|
||||
@ -554,49 +554,6 @@ func (r *KVSRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// KVSAtomicOp is used to define a single operation within an multi-key
|
||||
// transaction.
|
||||
type KVSAtomicOp struct {
|
||||
Op KVSOp
|
||||
DirEnt DirEntry
|
||||
}
|
||||
|
||||
// KVSAtomicOps is a list of atomic operations.
|
||||
type KVSAtomicOps []*KVSAtomicOp
|
||||
|
||||
// KVSAtomicRequest is used to perform atomic multi-key operations on the
|
||||
// Key-Value store.
|
||||
type KVSAtomicRequest struct {
|
||||
Datacenter string
|
||||
Ops KVSAtomicOps
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
func (r *KVSAtomicRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// KVSAtomicError is used to return information about an error for a specific
|
||||
// operation.
|
||||
type KVSAtomicError struct {
|
||||
OpIndex int
|
||||
What string
|
||||
}
|
||||
|
||||
// Error returns the string representation of an atomic error.
|
||||
func (e KVSAtomicError) Error() string {
|
||||
return fmt.Sprintf("op %d: %s", e.OpIndex, e.What)
|
||||
}
|
||||
|
||||
// KVSAtomicErrors is a list of KVSAtomicError entries.
|
||||
type KVSAtomicErrors []*KVSAtomicError
|
||||
|
||||
// KVSAtomicResponse is the structure returned by a KVSAtomicRequest.
|
||||
type KVSAtomicResponse struct {
|
||||
Errors KVSAtomicErrors
|
||||
Results DirEntries
|
||||
}
|
||||
|
||||
// KeyRequest is used to request a key, or key prefix
|
||||
type KeyRequest struct {
|
||||
Datacenter string
|
||||
|
69
consul/structs/txn.go
Normal file
69
consul/structs/txn.go
Normal file
@ -0,0 +1,69 @@
|
||||
package structs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// TxnKVSOp is used to define a single operation on the KVS inside a
|
||||
// transaction
|
||||
type TxnKVSOp struct {
|
||||
Verb KVSOp
|
||||
DirEnt DirEntry
|
||||
}
|
||||
|
||||
// TxnKVSResult is used to define the result of a single operation on the KVS
|
||||
// inside a transaction.
|
||||
type TxnKVSResult struct {
|
||||
DirEnt *DirEntry
|
||||
}
|
||||
|
||||
// TxnOp is used to define a single operation inside a transaction. Only one
|
||||
// of the types should be filled out per entry.
|
||||
type TxnOp struct {
|
||||
KVS *TxnKVSOp
|
||||
}
|
||||
|
||||
// TxnOps is a list of operations within a transaction.
|
||||
type TxnOps []*TxnOp
|
||||
|
||||
// TxnRequest is used to apply multiple operations to the state store in a
|
||||
// single transaction
|
||||
type TxnRequest struct {
|
||||
Datacenter string
|
||||
Ops TxnOps
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
func (r *TxnRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// TxnError is used to return information about an error for a specific
|
||||
// operation.
|
||||
type TxnError struct {
|
||||
OpIndex int
|
||||
What string
|
||||
}
|
||||
|
||||
// Error returns the string representation of an atomic error.
|
||||
func (e TxnError) Error() string {
|
||||
return fmt.Sprintf("op %d: %s", e.OpIndex, e.What)
|
||||
}
|
||||
|
||||
// TxnErrors is a list of TxnError entries.
|
||||
type TxnErrors []*TxnError
|
||||
|
||||
// TxnResult is used to define the result of a given operation inside a
|
||||
// transaction. Only one of the types should be filled out per entry.
|
||||
type TxnResult struct {
|
||||
KVS *TxnKVSResult
|
||||
}
|
||||
|
||||
// TxnResults is a list of TxnResult entries.
|
||||
type TxnResults []*TxnResult
|
||||
|
||||
// TxnResponse is the structure returned by a TxnRequest.
|
||||
type TxnResponse struct {
|
||||
Results TxnResults
|
||||
Errors TxnErrors
|
||||
}
|
@ -14,24 +14,26 @@ type Txn struct {
|
||||
}
|
||||
|
||||
// Apply is used to apply multiple operations in a single, atomic transaction.
|
||||
func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResponse) error {
|
||||
func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error {
|
||||
if done, err := t.srv.forward("Txn.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now())
|
||||
|
||||
// Perform the pre-apply checks on each of the operations.
|
||||
// Perform the pre-apply checks for any KVS operations.
|
||||
acl, err := t.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, op := range args.Ops {
|
||||
ok, err := kvsPreApply(t.srv, acl, op.Op, &op.DirEnt)
|
||||
if op.KVS != nil {
|
||||
ok, err := kvsPreApply(t.srv, acl, op.KVS.Verb, &op.KVS.DirEnt)
|
||||
if err != nil {
|
||||
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
|
||||
reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()})
|
||||
} else if !ok {
|
||||
err = fmt.Errorf("failed to lock key %q due to lock delay", op.DirEnt.Key)
|
||||
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
|
||||
err = fmt.Errorf("failed to lock key %q due to lock delay", op.KVS.DirEnt.Key)
|
||||
reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()})
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(reply.Errors) > 0 {
|
||||
@ -39,9 +41,9 @@ func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResp
|
||||
}
|
||||
|
||||
// Apply the update.
|
||||
resp, err := t.srv.raftApply(structs.KVSAtomicRequestType, args)
|
||||
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
|
||||
if err != nil {
|
||||
t.srv.logger.Printf("[ERR] consul.kvs: ApplyAtomic failed: %v", err)
|
||||
t.srv.logger.Printf("[ERR] consul.txn: Apply failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
@ -50,8 +52,8 @@ func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResp
|
||||
|
||||
// Convert the return type. This should be a cheap copy since we are
|
||||
// just taking the two slices.
|
||||
if respAtomic, ok := resp.(structs.KVSAtomicResponse); ok {
|
||||
*reply = respAtomic
|
||||
if txnResp, ok := resp.(structs.TxnResponse); ok {
|
||||
*reply = txnResp
|
||||
} else {
|
||||
return fmt.Errorf("unexpected return type %T", resp)
|
||||
}
|
||||
|
@ -25,26 +25,30 @@ func TestTxn_Apply(t *testing.T) {
|
||||
// Do a super basic request. The state store test covers the details so
|
||||
// we just need to be sure that the transaction is sent correctly and
|
||||
// the results are converted appropriately.
|
||||
arg := structs.KVSAtomicRequest{
|
||||
arg := structs.TxnRequest{
|
||||
Datacenter: "dc1",
|
||||
Ops: structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
Ops: structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "test",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicGet,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "test",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
var out structs.KVSAtomicResponse
|
||||
var out structs.TxnResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@ -64,9 +68,11 @@ func TestTxn_Apply(t *testing.T) {
|
||||
}
|
||||
|
||||
// Verify the transaction's return value.
|
||||
expected := structs.KVSAtomicResponse{
|
||||
Results: structs.DirEntries{
|
||||
&structs.DirEntry{
|
||||
expected := structs.TxnResponse{
|
||||
Results: structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "test",
|
||||
Flags: 42,
|
||||
Value: nil,
|
||||
@ -75,7 +81,11 @@ func TestTxn_Apply(t *testing.T) {
|
||||
ModifyIndex: d.ModifyIndex,
|
||||
},
|
||||
},
|
||||
&structs.DirEntry{
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
KVS: &structs.TxnKVSResult{
|
||||
DirEnt: &structs.DirEntry{
|
||||
Key: "test",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
@ -85,6 +95,8 @@ func TestTxn_Apply(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(out, expected) {
|
||||
t.Fatalf("bad %v", out)
|
||||
@ -124,81 +136,101 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
|
||||
|
||||
// Set up a transaction where every operation should get blocked due to
|
||||
// ACLs.
|
||||
arg := structs.KVSAtomicRequest{
|
||||
arg := structs.TxnRequest{
|
||||
Datacenter: "dc1",
|
||||
Ops: structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSSet,
|
||||
Ops: structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSDelete,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSDelete,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSDeleteCAS,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSDeleteCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSDeleteTree,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSDeleteTree,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSCAS,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCAS,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSLock,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSUnlock,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSUnlock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "foo",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicGet,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSGet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckSession,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckSession,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
},
|
||||
},
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSAtomicCheckIndex,
|
||||
},
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSCheckIndex,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "nope",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: id},
|
||||
}
|
||||
var out structs.KVSAtomicResponse
|
||||
var out structs.TxnResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify the transaction's return value.
|
||||
var expected structs.KVSAtomicResponse
|
||||
var expected structs.TxnResponse
|
||||
for i, _ := range arg.Ops {
|
||||
expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()})
|
||||
expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()})
|
||||
}
|
||||
if !reflect.DeepEqual(out, expected) {
|
||||
t.Fatalf("bad %v", out)
|
||||
@ -245,20 +277,22 @@ func TestTxn_Apply_LockDelay(t *testing.T) {
|
||||
validId := session.ID
|
||||
|
||||
// Make a lock request via an atomic transaction.
|
||||
arg := structs.KVSAtomicRequest{
|
||||
arg := structs.TxnRequest{
|
||||
Datacenter: "dc1",
|
||||
Ops: structs.KVSAtomicOps{
|
||||
&structs.KVSAtomicOp{
|
||||
Op: structs.KVSLock,
|
||||
Ops: structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
KVS: &structs.TxnKVSOp{
|
||||
Verb: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "test",
|
||||
Session: validId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
{
|
||||
var out structs.KVSAtomicResponse
|
||||
var out structs.TxnResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@ -275,13 +309,13 @@ func TestTxn_Apply_LockDelay(t *testing.T) {
|
||||
|
||||
// Should acquire.
|
||||
{
|
||||
var out structs.KVSAtomicResponse
|
||||
var out structs.TxnResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.Results) != 1 ||
|
||||
len(out.Errors) != 0 ||
|
||||
out.Results[0].LockIndex != 2 {
|
||||
out.Results[0].KVS.DirEnt.LockIndex != 2 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user