diff --git a/api/kv.go b/api/kv.go index 0d5599b530..db6e25b39b 100644 --- a/api/kv.go +++ b/api/kv.go @@ -41,7 +41,7 @@ const ( // KVTxnOp defines a single operation inside a transaction. type KVTxnOp struct { - Op string + Verb string Key string Value []byte Flags uint64 @@ -49,23 +49,14 @@ type KVTxnOp struct { Session string } -// KVTxn defines a set of operations to be performed inside a single transaction. -type KVTxn []KVTxnOp - -// KVTxnError is used to return information about an operation in a +// KVTxnOps defines a set of operations to be performed inside a single // transaction. -type KVTxnError struct { - OpIndex int - What string -} +type KVTxnOps []*KVTxnOp -// KVTxnErrors is a list of KVTxnError objects. -type KVTxnErrors []KVTxnError - -// KVTxnResult is used to return the results of a transaction. -type KVTxnResult struct { - Errors KVTxnErrors - Results KVPairs +// KVTxnResponse has the outcome of a transaction. +type KVTxnResponse struct { + Results []*KVPair + Errors TxnErrors } // KV is used to manipulate the K/V API @@ -284,43 +275,84 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption return res, qm, nil } +// TxnOp is the internal format we send to Consul. It's not specific to KV, +// though currently only KV operations are supported. +type TxnOp struct { + KVS *KVTxnOp +} + +// TxnOps is a list of transaction operations. +type TxnOps []*TxnOp + +// TxnResult is the internal format we receive from Consul. +type TxnResult struct { + KVS *struct{ DirEnt *KVPair } +} + +// TxnResults is a list of TxnResult objects. +type TxnResults []*TxnResult + +// TxnError is used to return information about an operation in a transaction. +type TxnError struct { + OpIndex int + What string +} + +// TxnErrors is a list of TxnError objects. +type TxnErrors []*TxnError + +// TxnResponse is the internal format we receive from Consul. +type TxnResponse struct { + Results TxnResults + Errors TxnErrors +} + // Txn is used to apply multiple KV operations in a single, atomic transaction. // Note that Go will perform the required base64 encoding on the values // automatically because the type is a byte slice. Transactions are defined as a // list of operations to perform, using the KVOp constants and KVTxnOp structure // to define operations. If any operation fails, none of the changes are applied -// to the state store. +// to the state store. Note that this hides the internal raw transaction interface +// and munges the input and output types into KV-specific ones for ease of use. +// If there are more non-KV operations in the future we may break out a new +// transaction API client, but it will be easy to keep this KV-specific variant +// supported. // // Here's an example: // -// txn := KVTxn{ -// KVTxnOp{ -// Op: KVLock, -// Key: "test/lock", +// ops := KVTxnOps{ +// &KVTxnOp{ +// Verb: KVLock, +// Key: "test/lock", // Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", -// Value: []byte("hello"), +// Value: []byte("hello"), // }, -// KVTxnOp{ -// Op: KVGet, -// Key: "another/key", +// &KVTxnOp{ +// Verb: KVGet, +// Key: "another/key", // }, // } -// ok, result, _, err := kv.Txn(&txn, nil) +// ok, response, _, err := kv.Txn(&ops, nil) // // If there is a problem making the transaction request then an error will be // returned. Otherwise, the ok value will be true if the transaction succeeded -// or false if it was rolled back. The result is a structured return value which +// or false if it was rolled back. The response is a structured return value which // will have the outcome of the transaction. Its Results member will have entries // for each operation. Deleted keys will have a nil entry in the, and to save // space, the Value of each key in the Results will be nil unless the operation // is a KVGet. If the transaction was rolled back, the Errors member will have // entries referencing the index of the operation that failed along with an error // message. -func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, error) { +func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) { r := k.c.newRequest("PUT", "/v1/txn") r.setWriteOptions(q) - r.obj = txn + // Convert into the internal format since this is an all-KV txn. + ops := make(TxnOps, 0, len(txn)) + for _, kvsOp := range txn { + ops = append(ops, &TxnOp{KVS: kvsOp}) + } + r.obj = ops rtt, resp, err := k.c.doRequest(r) if err != nil { return false, nil, nil, err @@ -331,11 +363,23 @@ func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, e wm.RequestTime = rtt if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { - var result KVTxnResult - if err := decodeBody(resp, &result); err != nil { + var txnResp TxnResponse + if err := decodeBody(resp, &txnResp); err != nil { return false, nil, nil, err } - return resp.StatusCode == http.StatusOK, &result, wm, nil + + // Convert from the internal format. + kvResp := KVTxnResponse{ + Errors: txnResp.Errors, + } + for _, result := range txnResp.Results { + var entry *KVPair + if result.KVS != nil { + entry = result.KVS.DirEnt + } + kvResp.Results = append(kvResp.Results, entry) + } + return resp.StatusCode == http.StatusOK, &kvResp, wm, nil } var buf bytes.Buffer diff --git a/api/kv_test.go b/api/kv_test.go index d9a4ea38fa..bd432194b8 100644 --- a/api/kv_test.go +++ b/api/kv_test.go @@ -466,18 +466,18 @@ func TestClient_Txn(t *testing.T) { // session. key := testKey() value := []byte("test") - txn := KVTxn{ - KVTxnOp{ - Op: KVLock, + txn := KVTxnOps{ + &KVTxnOp{ + Verb: KVLock, Key: key, Value: value, }, - KVTxnOp{ - Op: KVGet, - Key: key, + &KVTxnOp{ + Verb: KVGet, + Key: key, }, } - ok, ret, _, err := kv.Txn(&txn, nil) + ok, ret, _, err := kv.Txn(txn, nil) if err != nil { t.Fatalf("err: %v", err) } else if ok { @@ -494,7 +494,7 @@ func TestClient_Txn(t *testing.T) { // Now poke in a real session and try again. txn[0].Session = id - ok, ret, _, err = kv.Txn(&txn, nil) + ok, ret, _, err = kv.Txn(txn, nil) if err != nil { t.Fatalf("err: %v", err) } else if !ok { diff --git a/command/agent/txn_endpoint.go b/command/agent/txn_endpoint.go index 9defacf188..31a0962a74 100644 --- a/command/agent/txn_endpoint.go +++ b/command/agent/txn_endpoint.go @@ -10,14 +10,14 @@ import ( "github.com/hashicorp/consul/consul/structs" ) -// fixupValues takes the raw decoded JSON and base64 decodes all the values, +// fixupKVSOps takes the raw decoded JSON and base64 decodes all the KVS values, // replacing them with byte arrays with the data. -func fixupValues(raw interface{}) error { +func fixupKVSOps(raw interface{}) error { // decodeValue decodes the value member of the given operation. - decodeValue := func(rawOp interface{}) error { - rawMap, ok := rawOp.(map[string]interface{}) + decodeValue := func(rawKVS interface{}) error { + rawMap, ok := rawKVS.(map[string]interface{}) if !ok { - return fmt.Errorf("unexpected raw op type: %T", rawOp) + return fmt.Errorf("unexpected raw KVS type: %T", rawKVS) } for k, v := range rawMap { switch strings.ToLower(k) { @@ -41,7 +41,25 @@ func fixupValues(raw interface{}) error { return nil } } + return nil + } + // fixupKVSOp looks for non-nil KVS operations and passes them on for + // value conversion. + fixupKVSOp := func(rawOp interface{}) error { + rawMap, ok := rawOp.(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected raw op type: %T", rawOp) + } + for k, v := range rawMap { + switch strings.ToLower(k) { + case "kvs": + if v == nil { + return nil + } + return decodeValue(v) + } + } return nil } @@ -50,11 +68,10 @@ func fixupValues(raw interface{}) error { return fmt.Errorf("unexpected raw type: %t", raw) } for _, rawOp := range rawSlice { - if err := decodeValue(rawOp); err != nil { + if err := fixupKVSOp(rawOp); err != nil { return err } } - return nil } @@ -66,44 +83,53 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface return nil, nil } - var args structs.KVSAtomicRequest + var args structs.TxnRequest s.parseDC(req, &args.Datacenter) s.parseToken(req, &args.Token) // Note the body is in API format, and not the RPC format. If we can't // decode it, we will return a 400 since we don't have enough context to // associate the error with a given operation. - var txn api.KVTxn - if err := decodeBody(req, &txn, fixupValues); err != nil { + var ops api.TxnOps + if err := decodeBody(req, &ops, fixupKVSOps); err != nil { resp.WriteHeader(http.StatusBadRequest) resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err))) return nil, nil } - // Convert the API format into the RPC format. Note that fixupValues + // Convert the KVS API format into the RPC format. Note that fixupKVSOps // above will have already converted the base64 encoded strings into // byte arrays so we can assign right over. - for _, in := range txn { - // TODO @slackpad - Verify the size here, or move that down into - // the endpoint. - out := &structs.KVSAtomicOp{ - Op: structs.KVSOp(in.Op), - DirEnt: structs.DirEntry{ - Key: in.Key, - Value: in.Value, - Flags: in.Flags, - Session: in.Session, - RaftIndex: structs.RaftIndex{ - ModifyIndex: in.Index, + for _, in := range ops { + if in.KVS != nil { + if size := len(in.KVS.Value); size > maxKVSize { + resp.WriteHeader(http.StatusRequestEntityTooLarge) + resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)", + in.KVS.Key, size, maxKVSize))) + return nil, nil + } + + out := &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSOp(in.KVS.Verb), + DirEnt: structs.DirEntry{ + Key: in.KVS.Key, + Value: in.KVS.Value, + Flags: in.KVS.Flags, + Session: in.KVS.Session, + RaftIndex: structs.RaftIndex{ + ModifyIndex: in.KVS.Index, + }, + }, }, - }, + } + args.Ops = append(args.Ops, out) } - args.Ops = append(args.Ops, out) } // Make the request and return a conflict status if there were errors // reported from the transaction. - var reply structs.KVSAtomicResponse + var reply structs.TxnResponse if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil { return nil, err } diff --git a/command/agent/txn_endpoint_test.go b/command/agent/txn_endpoint_test.go index c4dcabb5c1..dd6ad079ae 100644 --- a/command/agent/txn_endpoint_test.go +++ b/command/agent/txn_endpoint_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "strings" "testing" "github.com/hashicorp/consul/consul/structs" @@ -50,6 +51,34 @@ func TestTxnEndpoint_Bad_Method(t *testing.T) { }) } +func TestTxnEndpoint_Bad_Size(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + buf := bytes.NewBuffer([]byte(fmt.Sprintf(` +[ + { + "KVS": { + "Verb": "set", + "Key": "key", + "Value": %q + } + } +] +`, strings.Repeat("bad", 2*maxKVSize)))) + req, err := http.NewRequest("PUT", "/v1/txn", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + if _, err := srv.Txn(resp, req); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 413 { + t.Fatalf("expected 413, got %d", resp.Code) + } + }) +} + func TestTxnEndpoint_KVS_Actions(t *testing.T) { httpTest(t, func(srv *HTTPServer) { // Make sure all incoming fields get converted properly to the internal @@ -60,15 +89,19 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) { buf := bytes.NewBuffer([]byte(fmt.Sprintf(` [ { - "Op": "lock", - "Key": "key", - "Value": "aGVsbG8gd29ybGQ=", - "Flags": 23, - "Session": %q + "KVS": { + "Verb": "lock", + "Key": "key", + "Value": "aGVsbG8gd29ybGQ=", + "Flags": 23, + "Session": %q + } }, { - "Op": "get", - "Key": "key" + "KVS": { + "Verb": "get", + "Key": "key" + } } ] `, id))) @@ -86,42 +119,50 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) { t.Fatalf("expected 200, got %d", resp.Code) } - atomic, ok := obj.(structs.KVSAtomicResponse) + txnResp, ok := obj.(structs.TxnResponse) if !ok { t.Fatalf("bad type: %T", obj) } - if len(atomic.Results) != 2 { - t.Fatalf("bad: %v", atomic) + if len(txnResp.Results) != 2 { + t.Fatalf("bad: %v", txnResp) } - index = atomic.Results[0].ModifyIndex - expected := structs.KVSAtomicResponse{ - Results: structs.DirEntries{ - &structs.DirEntry{ - Key: "key", - Value: nil, - Flags: 23, - Session: id, - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: index, - ModifyIndex: index, + index = txnResp.Results[0].KVS.DirEnt.ModifyIndex + expected := structs.TxnResponse{ + Results: structs.TxnResults{ + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "key", + Value: nil, + Flags: 23, + Session: id, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, }, }, - &structs.DirEntry{ - Key: "key", - Value: []byte("hello world"), - Flags: 23, - Session: id, - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: index, - ModifyIndex: index, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "key", + Value: []byte("hello world"), + Flags: 23, + Session: id, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, }, }, }, } - if !reflect.DeepEqual(atomic, expected) { - t.Fatalf("bad: %v", atomic) + if !reflect.DeepEqual(txnResp, expected) { + t.Fatalf("bad: %v", txnResp) } } @@ -131,14 +172,18 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) { buf := bytes.NewBuffer([]byte(fmt.Sprintf(` [ { - "Op": "cas", - "Key": "key", - "Value": "Z29vZGJ5ZSB3b3JsZA==", - "Index": %d + "KVS": { + "Verb": "cas", + "Key": "key", + "Value": "Z29vZGJ5ZSB3b3JsZA==", + "Index": %d + } }, { - "Op": "get", - "Key": "key" + "KVS": { + "Verb": "get", + "Key": "key" + } } ] `, index))) @@ -156,38 +201,46 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) { t.Fatalf("expected 200, got %d", resp.Code) } - atomic, ok := obj.(structs.KVSAtomicResponse) + txnResp, ok := obj.(structs.TxnResponse) if !ok { t.Fatalf("bad type: %T", obj) } - if len(atomic.Results) != 2 { - t.Fatalf("bad: %v", atomic) + if len(txnResp.Results) != 2 { + t.Fatalf("bad: %v", txnResp) } - modIndex := atomic.Results[0].ModifyIndex - expected := structs.KVSAtomicResponse{ - Results: structs.DirEntries{ - &structs.DirEntry{ - Key: "key", - Value: nil, - Session: id, - RaftIndex: structs.RaftIndex{ - CreateIndex: index, - ModifyIndex: modIndex, + modIndex := txnResp.Results[0].KVS.DirEnt.ModifyIndex + expected := structs.TxnResponse{ + Results: structs.TxnResults{ + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "key", + Value: nil, + Session: id, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: modIndex, + }, + }, }, }, - &structs.DirEntry{ - Key: "key", - Value: []byte("goodbye world"), - Session: id, - RaftIndex: structs.RaftIndex{ - CreateIndex: index, - ModifyIndex: modIndex, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "key", + Value: []byte("goodbye world"), + Session: id, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: modIndex, + }, + }, }, }, }, } - if !reflect.DeepEqual(atomic, expected) { - t.Fatalf("bad: %v", atomic) + if !reflect.DeepEqual(txnResp, expected) { + t.Fatalf("bad: %v", txnResp) } } }) @@ -197,14 +250,18 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) { buf := bytes.NewBuffer([]byte(` [ { - "Op": "lock", - "Key": "key", - "Value": "aGVsbG8gd29ybGQ=", - "Session": "nope" + "KVS": { + "Verb": "lock", + "Key": "key", + "Value": "aGVsbG8gd29ybGQ=", + "Session": "nope" + } }, { - "Op": "get", - "Key": "key" + "KVS": { + "Verb": "get", + "Key": "key" + } } ] `)) diff --git a/consul/fsm.go b/consul/fsm.go index 17fede632f..fab5b8c12b 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -83,8 +83,6 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyDeregister(buf[1:], log.Index) case structs.KVSRequestType: return c.applyKVSOperation(buf[1:], log.Index) - case structs.KVSAtomicRequestType: - return c.applyKVSAtomicOperation(buf[1:], log.Index) case structs.SessionRequestType: return c.applySessionOperation(buf[1:], log.Index) case structs.ACLRequestType: @@ -95,6 +93,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyCoordinateBatchUpdate(buf[1:], log.Index) case structs.PreparedQueryRequestType: return c.applyPreparedQueryOperation(buf[1:], log.Index) + case structs.TxnRequestType: + return c.applyTxn(buf[1:], log.Index) default: if ignoreUnknown { c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -195,16 +195,6 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { } } -func (c *consulFSM) applyKVSAtomicOperation(buf []byte, index uint64) interface{} { - var req structs.KVSAtomicRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSince([]string{"consul", "fsm", "kvs-atomic"}, time.Now()) - entries, errors := c.state.KVSAtomicUpdate(index, req.Ops) - return structs.KVSAtomicResponse{errors, entries} -} - func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} { var req structs.SessionRequest if err := structs.Decode(buf, &req); err != nil { @@ -298,6 +288,16 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf } } +func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} { + var req structs.TxnRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now()) + results, errors := c.state.TxnRun(index, req.Ops) + return structs.TxnResponse{results, errors} +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 5f8b32a325..e478a6b027 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -1241,6 +1241,47 @@ func TestFSM_TombstoneReap(t *testing.T) { } } +func TestFSM_Txn(t *testing.T) { + fsm, err := NewFSM(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Set a key using a transaction. + req := structs.TxnRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + }, + }, + }, + } + buf, err := structs.Encode(structs.TxnRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(structs.TxnResponse); !ok { + t.Fatalf("bad response type: %T", resp) + } + + // Verify key is set directly in the state store. + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } +} + func TestFSM_IgnoreUnknown(t *testing.T) { fsm, err := NewFSM(nil, os.Stderr) if err != nil { diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 8e7864e2bc..0c2a05e1cc 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -31,9 +31,9 @@ func kvsPreApply(srv *Server, acl acl.ACL, op structs.KVSOp, dirEnt *structs.Dir return false, permissionDeniedErr } - case structs.KVSAtomicGet, - structs.KVSAtomicCheckSession, - structs.KVSAtomicCheckIndex: + case structs.KVSGet, + structs.KVSCheckSession, + structs.KVSCheckIndex: if !acl.KeyRead(dirEnt.Key) { return false, permissionDeniedErr } diff --git a/consul/state/kvs.go b/consul/state/kvs.go index afdf5cc9c6..2ef9012bf1 100644 --- a/consul/state/kvs.go +++ b/consul/state/kvs.go @@ -579,103 +579,6 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE return true, nil } -// KVSAtomicUpdate performs a series of updates atomically, all inside a single -// transaction that only succeeds if all the operations succeed. -func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.KVSAtomicErrors) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Dispatch all of the operations inside the transaction. - entries := make(structs.DirEntries, 0, len(ops)) - errors := make(structs.KVSAtomicErrors, 0, len(ops)) - for i, op := range ops { - var entry *structs.DirEntry - var err error - - switch op.Op { - case structs.KVSSet: - entry = &op.DirEnt - err = s.kvsSetTxn(tx, idx, entry, false) - - case structs.KVSDelete: - err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key) - - case structs.KVSDeleteCAS: - var ok bool - ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key) - if !ok && err == nil { - err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key) - } - - case structs.KVSDeleteTree: - err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key) - - case structs.KVSCAS: - var ok bool - entry = &op.DirEnt - ok, err = s.kvsSetCASTxn(tx, idx, entry) - if !ok && err == nil { - err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key) - } - - case structs.KVSLock: - var ok bool - entry = &op.DirEnt - ok, err = s.kvsLockTxn(tx, idx, entry) - if !ok && err == nil { - err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key) - } - - case structs.KVSUnlock: - var ok bool - entry = &op.DirEnt - ok, err = s.kvsUnlockTxn(tx, idx, entry) - if !ok && err == nil { - err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key) - } - - case structs.KVSAtomicGet: - _, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key) - - case structs.KVSAtomicCheckSession: - entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session) - - case structs.KVSAtomicCheckIndex: - entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex) - - default: - err = fmt.Errorf("unknown operation %q", op.Op) - } - - // Accumulate the entries. For a GET we keep the value, otherwise - // we clone and blank out the value (we have to clone so we don't - // modify the entry being used by the state store). - if entry != nil { - if op.Op == structs.KVSAtomicGet { - entries = append(entries, entry) - } else { - clone := entry.Clone() - clone.Value = nil - entries = append(entries, clone) - } - } else { - entries = append(entries, nil) - } - - // Capture any error along with the index of the operation that - // failed. - if err != nil { - errors = append(errors, &structs.KVSAtomicError{i, err.Error()}) - } - } - if len(errors) > 0 { - return nil, errors - } - - tx.Commit() - return entries, nil -} - // kvsCheckSessionTxn checks to see if the given session matches the current // entry for a key. func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) { diff --git a/consul/state/kvs_test.go b/consul/state/kvs_test.go index e5a552b8b1..bd8996a014 100644 --- a/consul/state/kvs_test.go +++ b/consul/state/kvs_test.go @@ -1214,453 +1214,6 @@ func TestStateStore_KVSUnlock(t *testing.T) { } } -func TestStateStore_KVS_Atomic(t *testing.T) { - s := testStateStore(t) - - // Create kvs entries in the state store. - testSetKey(t, s, 1, "foo/delete", "bar") - testSetKey(t, s, 2, "foo/bar/baz", "baz") - testSetKey(t, s, 3, "foo/bar/zip", "zip") - testSetKey(t, s, 4, "foo/zorp", "zorp") - testSetKey(t, s, 5, "foo/update", "stale") - - // Make a real session. - testRegisterNode(t, s, 6, "node1") - session := testUUID() - if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // Set up a transaction that hits every operation. - ops := structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "foo/new", - Value: []byte("one"), - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSDelete, - DirEnt: structs.DirEntry{ - Key: "foo/zorp", - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSDeleteCAS, - DirEnt: structs.DirEntry{ - Key: "foo/delete", - RaftIndex: structs.RaftIndex{ - ModifyIndex: 1, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSDeleteTree, - DirEnt: structs.DirEntry{ - Key: "foo/bar", - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicGet, - DirEnt: structs.DirEntry{ - Key: "foo/update", - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckIndex, - DirEnt: structs.DirEntry{ - Key: "foo/update", - RaftIndex: structs.RaftIndex{ - ModifyIndex: 5, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSCAS, - DirEnt: structs.DirEntry{ - Key: "foo/update", - Value: []byte("new"), - RaftIndex: structs.RaftIndex{ - ModifyIndex: 5, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicGet, - DirEnt: structs.DirEntry{ - Key: "foo/update", - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicGet, - DirEnt: structs.DirEntry{ - Key: "not/there", - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckIndex, - DirEnt: structs.DirEntry{ - Key: "foo/update", - RaftIndex: structs.RaftIndex{ - ModifyIndex: 8, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicGet, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: session, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckSession, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: session, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSUnlock, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: session, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckSession, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: "", - }, - }, - } - entries, errors := s.KVSAtomicUpdate(8, ops) - if len(errors) > 0 { - t.Fatalf("err: %v", errors) - } - if len(entries) != len(ops) { - t.Fatalf("bad len: %d != %d", len(entries), len(ops)) - } - - // Make sure the response looks as expected. - expected := structs.DirEntries{ - &structs.DirEntry{ - Key: "foo/new", - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - nil, // delete - nil, // delete tree - nil, // delete CAS - &structs.DirEntry{ - Key: "foo/update", - Value: []byte("stale"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 5, - }, - }, - &structs.DirEntry{ - Key: "foo/update", - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 5, - }, - }, - &structs.DirEntry{ - Key: "foo/update", - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 8, - }, - }, - &structs.DirEntry{ - Key: "foo/update", - Value: []byte("new"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 8, - }, - }, - nil, // get on not/there - &structs.DirEntry{ - Key: "foo/update", - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 8, - }, - }, - nil, // get on foo/lock before it's created - &structs.DirEntry{ - Key: "foo/lock", - Session: session, - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - &structs.DirEntry{ - Key: "foo/lock", - Session: session, - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - &structs.DirEntry{ - Key: "foo/lock", - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - &structs.DirEntry{ - Key: "foo/lock", - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - } - if len(entries) != len(expected) { - t.Fatalf("bad: %v", entries) - } - for i, _ := range entries { - if !reflect.DeepEqual(entries[i], expected[i]) { - t.Fatalf("bad %d: %v != %v", i, *(entries[i]), *(expected[i])) - } - } - - // Pull the resulting state store contents. - idx, actual, err := s.KVSList("") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 8 { - t.Fatalf("bad index: %d", idx) - } - - // Make sure it looks as expected. - expected = structs.DirEntries{ - &structs.DirEntry{ - Key: "foo/lock", - LockIndex: 1, - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - &structs.DirEntry{ - Key: "foo/new", - Value: []byte("one"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 8, - ModifyIndex: 8, - }, - }, - &structs.DirEntry{ - Key: "foo/update", - Value: []byte("new"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 8, - }, - }, - } - if len(actual) != len(expected) { - t.Fatalf("bad len: %d != %d", len(actual), len(expected)) - } - for i, _ := range actual { - if !reflect.DeepEqual(actual[i], expected[i]) { - t.Fatalf("bad %d: %v != %v", i, *(actual[i]), *(expected[i])) - } - } -} - -func TestStateStore_KVS_Atomic_Rollback(t *testing.T) { - s := testStateStore(t) - - // Create kvs entries in the state store. - testSetKey(t, s, 1, "foo/delete", "bar") - testSetKey(t, s, 2, "foo/update", "stale") - - testRegisterNode(t, s, 3, "node1") - session := testUUID() - if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session}) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - bogus := testUUID() - if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // This function verifies that the state store wasn't changed. - verifyStateStore := func(desc string) { - idx, actual, err := s.KVSList("") - if err != nil { - t.Fatalf("err (%s): %s", desc, err) - } - if idx != 5 { - t.Fatalf("bad index (%s): %d", desc, idx) - } - - // Make sure it looks as expected. - expected := structs.DirEntries{ - &structs.DirEntry{ - Key: "foo/delete", - Value: []byte("bar"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 1, - ModifyIndex: 1, - }, - }, - &structs.DirEntry{ - Key: "foo/lock", - Value: []byte("foo"), - LockIndex: 1, - Session: session, - RaftIndex: structs.RaftIndex{ - CreateIndex: 5, - ModifyIndex: 5, - }, - }, - &structs.DirEntry{ - Key: "foo/update", - Value: []byte("stale"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 2, - ModifyIndex: 2, - }, - }, - } - if len(actual) != len(expected) { - t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(expected)) - } - for i, _ := range actual { - if !reflect.DeepEqual(actual[i], expected[i]) { - t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(expected[i])) - } - } - } - verifyStateStore("initial") - - // Set up a transaction that fails every operation. - ops := structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSCAS, - DirEnt: structs.DirEntry{ - Key: "foo/update", - Value: []byte("new"), - RaftIndex: structs.RaftIndex{ - ModifyIndex: 1, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: bogus, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSUnlock, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: bogus, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckSession, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - Session: bogus, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckSession, - DirEnt: structs.DirEntry{ - Key: "nope", - Session: bogus, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckIndex, - DirEnt: structs.DirEntry{ - Key: "foo/lock", - RaftIndex: structs.RaftIndex{ - ModifyIndex: 6, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckIndex, - DirEnt: structs.DirEntry{ - Key: "nope", - RaftIndex: structs.RaftIndex{ - ModifyIndex: 6, - }, - }, - }, - &structs.KVSAtomicOp{ - Op: "nope", - DirEnt: structs.DirEntry{ - Key: "foo/delete", - }, - }, - } - entries, errors := s.KVSAtomicUpdate(7, ops) - if len(errors) != len(ops) { - t.Fatalf("bad len: %d != %d", len(errors), len(ops)) - } - if len(entries) != 0 { - t.Fatalf("bad len: %d != 0", len(entries)) - } - verifyStateStore("after") - - // Make sure the errors look reasonable. - expected := []string{ - "index is stale", - "lock is already held", - "lock isn't held, or is held by another session", - "current session", - `key "nope" doesn't exist`, - "current modify index", - `key "nope" doesn't exist`, - "unknown operation", - } - if len(errors) != len(expected) { - t.Fatalf("bad len: %d != %d", len(errors), len(expected)) - } - for i, msg := range expected { - if errors[i].OpIndex != i { - t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex) - } - if !strings.Contains(errors[i].Error(), msg) { - t.Fatalf("bad %d: %v", i, errors[i].Error()) - } - } -} - func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { s := testStateStore(t) @@ -1893,73 +1446,6 @@ func TestStateStore_KVS_Watches(t *testing.T) { }) }) }) - - // Verify that a basic transaction triggers multiple watches. We call - // the same underlying methods that are called above so this is more - // of a sanity check. - verifyWatch(t, s.GetKVSWatch("multi/one"), func() { - verifyWatch(t, s.GetKVSWatch("multi/two"), func() { - ops := structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/one", - Value: []byte("one"), - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/two", - Value: []byte("two"), - }, - }, - } - entries, errors := s.KVSAtomicUpdate(15, ops) - if len(entries) != len(ops) { - t.Fatalf("bad len: %d != %d", len(entries), len(ops)) - } - if len(errors) != 0 { - t.Fatalf("bad len: %d != 0", len(errors)) - } - }) - }) - - // Verify that a rolled back transaction doesn't trigger any watches. - verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() { - verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() { - ops := structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/one", - Value: []byte("one-updated"), - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "multi/two", - Value: []byte("two-updated"), - }, - }, - &structs.KVSAtomicOp{ - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: "multi/nope", - Value: []byte("nope"), - }, - }, - } - entries, errors := s.KVSAtomicUpdate(16, ops) - if len(errors) != 1 { - t.Fatalf("bad len: %d != 1", len(errors)) - } - if len(entries) != 0 { - t.Fatalf("bad len: %d != 0", len(entries)) - } - }) - }) } func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { diff --git a/consul/state/txn.go b/consul/state/txn.go new file mode 100644 index 0000000000..c58f939025 --- /dev/null +++ b/consul/state/txn.go @@ -0,0 +1,123 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" +) + +func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVSOp) (*structs.TxnKVSResult, error) { + var entry *structs.DirEntry + var err error + + switch op.Verb { + case structs.KVSSet: + entry = &op.DirEnt + err = s.kvsSetTxn(tx, idx, entry, false) + + case structs.KVSDelete: + err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key) + + case structs.KVSDeleteCAS: + var ok bool + ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key) + if !ok && err == nil { + err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key) + } + + case structs.KVSDeleteTree: + err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key) + + case structs.KVSCAS: + var ok bool + entry = &op.DirEnt + ok, err = s.kvsSetCASTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key) + } + + case structs.KVSLock: + var ok bool + entry = &op.DirEnt + ok, err = s.kvsLockTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key) + } + + case structs.KVSUnlock: + var ok bool + entry = &op.DirEnt + ok, err = s.kvsUnlockTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key) + } + + case structs.KVSGet: + _, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key) + + case structs.KVSCheckSession: + entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session) + + case structs.KVSCheckIndex: + entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex) + + default: + err = fmt.Errorf("unknown KVS verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == structs.KVSGet { + return &structs.TxnKVSResult{entry}, nil + } + + clone := entry.Clone() + clone.Value = nil + return &structs.TxnKVSResult{clone}, nil + } + + return nil, nil +} + +// TxnRun tries to run the given operations all inside a single transaction. If +// any of the operations fail, the entire transaction will be rolled back. +func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Dispatch all of the operations inside the transaction. + results := make(structs.TxnResults, 0, len(ops)) + errors := make(structs.TxnErrors, 0, len(ops)) + for i, op := range ops { + var result structs.TxnResult + var err error + + // Dispatch based on the type of operation. + if op.KVS != nil { + result.KVS, err = s.txnKVS(tx, idx, op.KVS) + } else { + err = fmt.Errorf("no operation specified") + } + + // Accumulate the results. + results = append(results, &result) + + // Capture any error along with the index of the operation that + // failed. + if err != nil { + errors = append(errors, &structs.TxnError{i, err.Error()}) + } + } + if len(errors) > 0 { + return nil, errors + } + + tx.Commit() + return results, nil +} diff --git a/consul/state/txn_test.go b/consul/state/txn_test.go new file mode 100644 index 0000000000..c69516a98c --- /dev/null +++ b/consul/state/txn_test.go @@ -0,0 +1,624 @@ +package state + +import ( + "reflect" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/structs" +) + +func TestStateStore_Txn_KVS(t *testing.T) { + s := testStateStore(t) + + // Create kvs results in the state store. + testSetKey(t, s, 1, "foo/delete", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "baz") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/zorp", "zorp") + testSetKey(t, s, 5, "foo/update", "stale") + + // Make a real session. + testRegisterNode(t, s, 6, "node1") + session := testUUID() + if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo/new", + Value: []byte("one"), + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: "foo/zorp", + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSDeleteCAS, + DirEnt: structs.DirEntry{ + Key: "foo/delete", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 1, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSDeleteTree, + DirEnt: structs.DirEntry{ + Key: "foo/bar", + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "foo/update", + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckIndex, + DirEnt: structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 5, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCAS, + DirEnt: structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + ModifyIndex: 5, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "foo/update", + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "not/there", + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckIndex, + DirEnt: structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: session, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: session, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: session, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: "", + }, + }, + }, + } + results, errors := s.TxnRun(8, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + if len(results) != len(ops) { + t.Fatalf("bad len: %d != %d", len(results), len(ops)) + } + + // Make sure the response looks as expected. + expected := structs.TxnResults{ + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/new", + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{}, // delete + &structs.TxnResult{}, // delete tree + &structs.TxnResult{}, // delete CAS + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/update", + Value: []byte("stale"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + }, + }, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + }, + }, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{}, // get on not/there + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{}, // get on foo/lock before it's created + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/lock", + Session: session, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/lock", + Session: session, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/lock", + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + }, + }, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "foo/lock", + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + }, + }, + } + if len(results) != len(expected) { + t.Fatalf("bad: %v", results) + } + for i, _ := range results { + if !reflect.DeepEqual(results[i], expected[i]) { + t.Fatalf("bad %d", i) + } + } + + // Pull the resulting state store contents. + idx, actual, err := s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + entries := structs.DirEntries{ + &structs.DirEntry{ + Key: "foo/lock", + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/new", + Value: []byte("one"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + } + if len(actual) != len(entries) { + t.Fatalf("bad len: %d != %d", len(actual), len(entries)) + } + for i, _ := range actual { + if !reflect.DeepEqual(actual[i], entries[i]) { + t.Fatalf("bad %d", i) + } + } +} + +func TestStateStore_Txn_KVS_Rollback(t *testing.T) { + s := testStateStore(t) + + // Create kvs results in the state store. + testSetKey(t, s, 1, "foo/delete", "bar") + testSetKey(t, s, 2, "foo/update", "stale") + + testRegisterNode(t, s, 3, "node1") + session := testUUID() + if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session}) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + bogus := testUUID() + if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // This function verifies that the state store wasn't changed. + verifyStateStore := func(desc string) { + idx, actual, err := s.KVSList("") + if err != nil { + t.Fatalf("err (%s): %s", desc, err) + } + if idx != 5 { + t.Fatalf("bad index (%s): %d", desc, idx) + } + + // Make sure it looks as expected. + entries := structs.DirEntries{ + &structs.DirEntry{ + Key: "foo/delete", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + &structs.DirEntry{ + Key: "foo/lock", + Value: []byte("foo"), + LockIndex: 1, + Session: session, + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + Value: []byte("stale"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + } + if len(actual) != len(entries) { + t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(entries)) + } + for i, _ := range actual { + if !reflect.DeepEqual(actual[i], entries[i]) { + t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(entries[i])) + } + } + } + verifyStateStore("initial") + + // Set up a transaction that fails every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCAS, + DirEnt: structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + ModifyIndex: 1, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: bogus, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: bogus, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: bogus, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "nope", + Session: bogus, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckIndex, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 6, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckIndex, + DirEnt: structs.DirEntry{ + Key: "nope", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 6, + }, + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: "nope", + DirEnt: structs.DirEntry{ + Key: "foo/delete", + }, + }, + }, + } + results, errors := s.TxnRun(7, ops) + if len(errors) != len(ops) { + t.Fatalf("bad len: %d != %d", len(errors), len(ops)) + } + if len(results) != 0 { + t.Fatalf("bad len: %d != 0", len(results)) + } + verifyStateStore("after") + + // Make sure the errors look reasonable. + expected := []string{ + "index is stale", + "lock is already held", + "lock isn't held, or is held by another session", + "current session", + `key "nope" doesn't exist`, + "current modify index", + `key "nope" doesn't exist`, + "unknown KVS verb", + } + if len(errors) != len(expected) { + t.Fatalf("bad len: %d != %d", len(errors), len(expected)) + } + for i, msg := range expected { + if errors[i].OpIndex != i { + t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex) + } + if !strings.Contains(errors[i].Error(), msg) { + t.Fatalf("bad %d: %v", i, errors[i].Error()) + } + } +} + +func TestStateStore_Txn_Watches(t *testing.T) { + s := testStateStore(t) + + // Verify that a basic transaction triggers multiple watches. We call + // the same underlying methods that are called above so this is more + // of a sanity check. + verifyWatch(t, s.GetKVSWatch("multi/one"), func() { + verifyWatch(t, s.GetKVSWatch("multi/two"), func() { + ops := structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "multi/one", + Value: []byte("one"), + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "multi/two", + Value: []byte("two"), + }, + }, + }, + } + results, errors := s.TxnRun(15, ops) + if len(results) != len(ops) { + t.Fatalf("bad len: %d != %d", len(results), len(ops)) + } + if len(errors) != 0 { + t.Fatalf("bad len: %d != 0", len(errors)) + } + }) + }) + + // Verify that a rolled back transaction doesn't trigger any watches. + verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() { + verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() { + ops := structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "multi/one", + Value: []byte("one-updated"), + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "multi/two", + Value: []byte("two-updated"), + }, + }, + }, + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "multi/nope", + Value: []byte("nope"), + }, + }, + }, + } + results, errors := s.TxnRun(16, ops) + if len(errors) != 1 { + t.Fatalf("bad len: %d != 1", len(errors)) + } + if len(results) != 0 { + t.Fatalf("bad len: %d != 0", len(results)) + } + }) + }) +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 3aafdca032..1ea9300819 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -36,7 +36,7 @@ const ( TombstoneRequestType CoordinateBatchUpdateType PreparedQueryRequestType - KVSAtomicRequestType + TxnRequestType ) const ( @@ -535,11 +535,11 @@ const ( KVSLock = "lock" // Lock a key KVSUnlock = "unlock" // Unlock a key - // KVSAtomic* operations are only available in KVSAtomicRequest - // transactions. - KVSAtomicGet = "get" // Read the key during the transaction. - KVSAtomicCheckSession = "check-session" // Check the session holds the key. - KVSAtomicCheckIndex = "check-index" // Check the modify index of the key. + // The following operations are only available inside of atomic + // transactions via the Txn request. + KVSGet = "get" // Read the key during the transaction. + KVSCheckSession = "check-session" // Check the session holds the key. + KVSCheckIndex = "check-index" // Check the modify index of the key. ) // KVSRequest is used to operate on the Key-Value store @@ -554,49 +554,6 @@ func (r *KVSRequest) RequestDatacenter() string { return r.Datacenter } -// KVSAtomicOp is used to define a single operation within an multi-key -// transaction. -type KVSAtomicOp struct { - Op KVSOp - DirEnt DirEntry -} - -// KVSAtomicOps is a list of atomic operations. -type KVSAtomicOps []*KVSAtomicOp - -// KVSAtomicRequest is used to perform atomic multi-key operations on the -// Key-Value store. -type KVSAtomicRequest struct { - Datacenter string - Ops KVSAtomicOps - WriteRequest -} - -func (r *KVSAtomicRequest) RequestDatacenter() string { - return r.Datacenter -} - -// KVSAtomicError is used to return information about an error for a specific -// operation. -type KVSAtomicError struct { - OpIndex int - What string -} - -// Error returns the string representation of an atomic error. -func (e KVSAtomicError) Error() string { - return fmt.Sprintf("op %d: %s", e.OpIndex, e.What) -} - -// KVSAtomicErrors is a list of KVSAtomicError entries. -type KVSAtomicErrors []*KVSAtomicError - -// KVSAtomicResponse is the structure returned by a KVSAtomicRequest. -type KVSAtomicResponse struct { - Errors KVSAtomicErrors - Results DirEntries -} - // KeyRequest is used to request a key, or key prefix type KeyRequest struct { Datacenter string diff --git a/consul/structs/txn.go b/consul/structs/txn.go new file mode 100644 index 0000000000..c7dce6bf37 --- /dev/null +++ b/consul/structs/txn.go @@ -0,0 +1,69 @@ +package structs + +import ( + "fmt" +) + +// TxnKVSOp is used to define a single operation on the KVS inside a +// transaction +type TxnKVSOp struct { + Verb KVSOp + DirEnt DirEntry +} + +// TxnKVSResult is used to define the result of a single operation on the KVS +// inside a transaction. +type TxnKVSResult struct { + DirEnt *DirEntry +} + +// TxnOp is used to define a single operation inside a transaction. Only one +// of the types should be filled out per entry. +type TxnOp struct { + KVS *TxnKVSOp +} + +// TxnOps is a list of operations within a transaction. +type TxnOps []*TxnOp + +// TxnRequest is used to apply multiple operations to the state store in a +// single transaction +type TxnRequest struct { + Datacenter string + Ops TxnOps + WriteRequest +} + +func (r *TxnRequest) RequestDatacenter() string { + return r.Datacenter +} + +// TxnError is used to return information about an error for a specific +// operation. +type TxnError struct { + OpIndex int + What string +} + +// Error returns the string representation of an atomic error. +func (e TxnError) Error() string { + return fmt.Sprintf("op %d: %s", e.OpIndex, e.What) +} + +// TxnErrors is a list of TxnError entries. +type TxnErrors []*TxnError + +// TxnResult is used to define the result of a given operation inside a +// transaction. Only one of the types should be filled out per entry. +type TxnResult struct { + KVS *TxnKVSResult +} + +// TxnResults is a list of TxnResult entries. +type TxnResults []*TxnResult + +// TxnResponse is the structure returned by a TxnRequest. +type TxnResponse struct { + Results TxnResults + Errors TxnErrors +} diff --git a/consul/txn_endpoint.go b/consul/txn_endpoint.go index beb587e947..81de6e5541 100644 --- a/consul/txn_endpoint.go +++ b/consul/txn_endpoint.go @@ -14,24 +14,26 @@ type Txn struct { } // Apply is used to apply multiple operations in a single, atomic transaction. -func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResponse) error { +func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { if done, err := t.srv.forward("Txn.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now()) - // Perform the pre-apply checks on each of the operations. + // Perform the pre-apply checks for any KVS operations. acl, err := t.srv.resolveToken(args.Token) if err != nil { return err } for i, op := range args.Ops { - ok, err := kvsPreApply(t.srv, acl, op.Op, &op.DirEnt) - if err != nil { - reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()}) - } else if !ok { - err = fmt.Errorf("failed to lock key %q due to lock delay", op.DirEnt.Key) - reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()}) + if op.KVS != nil { + ok, err := kvsPreApply(t.srv, acl, op.KVS.Verb, &op.KVS.DirEnt) + if err != nil { + reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()}) + } else if !ok { + err = fmt.Errorf("failed to lock key %q due to lock delay", op.KVS.DirEnt.Key) + reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()}) + } } } if len(reply.Errors) > 0 { @@ -39,9 +41,9 @@ func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResp } // Apply the update. - resp, err := t.srv.raftApply(structs.KVSAtomicRequestType, args) + resp, err := t.srv.raftApply(structs.TxnRequestType, args) if err != nil { - t.srv.logger.Printf("[ERR] consul.kvs: ApplyAtomic failed: %v", err) + t.srv.logger.Printf("[ERR] consul.txn: Apply failed: %v", err) return err } if respErr, ok := resp.(error); ok { @@ -50,8 +52,8 @@ func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResp // Convert the return type. This should be a cheap copy since we are // just taking the two slices. - if respAtomic, ok := resp.(structs.KVSAtomicResponse); ok { - *reply = respAtomic + if txnResp, ok := resp.(structs.TxnResponse); ok { + *reply = txnResp } else { return fmt.Errorf("unexpected return type %T", resp) } diff --git a/consul/txn_endpoint_test.go b/consul/txn_endpoint_test.go index b11caa72ea..9d4a6b1601 100644 --- a/consul/txn_endpoint_test.go +++ b/consul/txn_endpoint_test.go @@ -25,26 +25,30 @@ func TestTxn_Apply(t *testing.T) { // Do a super basic request. The state store test covers the details so // we just need to be sure that the transaction is sent correctly and // the results are converted appropriately. - arg := structs.KVSAtomicRequest{ + arg := structs.TxnRequest{ Datacenter: "dc1", - Ops: structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "test", - Flags: 42, - Value: []byte("test"), + Ops: structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicGet, - DirEnt: structs.DirEntry{ - Key: "test", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "test", + }, }, }, }, } - var out structs.KVSAtomicResponse + var out structs.TxnResponse if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -64,24 +68,32 @@ func TestTxn_Apply(t *testing.T) { } // Verify the transaction's return value. - expected := structs.KVSAtomicResponse{ - Results: structs.DirEntries{ - &structs.DirEntry{ - Key: "test", - Flags: 42, - Value: nil, - RaftIndex: structs.RaftIndex{ - CreateIndex: d.CreateIndex, - ModifyIndex: d.ModifyIndex, + expected := structs.TxnResponse{ + Results: structs.TxnResults{ + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "test", + Flags: 42, + Value: nil, + RaftIndex: structs.RaftIndex{ + CreateIndex: d.CreateIndex, + ModifyIndex: d.ModifyIndex, + }, + }, }, }, - &structs.DirEntry{ - Key: "test", - Flags: 42, - Value: []byte("test"), - RaftIndex: structs.RaftIndex{ - CreateIndex: d.CreateIndex, - ModifyIndex: d.ModifyIndex, + &structs.TxnResult{ + KVS: &structs.TxnKVSResult{ + DirEnt: &structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + RaftIndex: structs.RaftIndex{ + CreateIndex: d.CreateIndex, + ModifyIndex: d.ModifyIndex, + }, + }, }, }, }, @@ -124,81 +136,101 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { // Set up a transaction where every operation should get blocked due to // ACLs. - arg := structs.KVSAtomicRequest{ + arg := structs.TxnRequest{ Datacenter: "dc1", - Ops: structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSSet, - DirEnt: structs.DirEntry{ - Key: "foo", + Ops: structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSDelete, - DirEnt: structs.DirEntry{ - Key: "foo", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSDeleteCAS, - DirEnt: structs.DirEntry{ - Key: "foo", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSDeleteCAS, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSDeleteTree, - DirEnt: structs.DirEntry{ - Key: "foo", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSDeleteTree, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSCAS, - DirEnt: structs.DirEntry{ - Key: "foo", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCAS, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: "foo", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSUnlock, - DirEnt: structs.DirEntry{ - Key: "foo", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "foo", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicGet, - DirEnt: structs.DirEntry{ - Key: "nope", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "nope", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckSession, - DirEnt: structs.DirEntry{ - Key: "nope", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "nope", + }, }, }, - &structs.KVSAtomicOp{ - Op: structs.KVSAtomicCheckIndex, - DirEnt: structs.DirEntry{ - Key: "nope", + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSCheckIndex, + DirEnt: structs.DirEntry{ + Key: "nope", + }, }, }, }, WriteRequest: structs.WriteRequest{Token: id}, } - var out structs.KVSAtomicResponse + var out structs.TxnResponse if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } // Verify the transaction's return value. - var expected structs.KVSAtomicResponse + var expected structs.TxnResponse for i, _ := range arg.Ops { - expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()}) + expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()}) } if !reflect.DeepEqual(out, expected) { t.Fatalf("bad %v", out) @@ -245,20 +277,22 @@ func TestTxn_Apply_LockDelay(t *testing.T) { validId := session.ID // Make a lock request via an atomic transaction. - arg := structs.KVSAtomicRequest{ + arg := structs.TxnRequest{ Datacenter: "dc1", - Ops: structs.KVSAtomicOps{ - &structs.KVSAtomicOp{ - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: "test", - Session: validId, + Ops: structs.TxnOps{ + &structs.TxnOp{ + KVS: &structs.TxnKVSOp{ + Verb: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "test", + Session: validId, + }, }, }, }, } { - var out structs.KVSAtomicResponse + var out structs.TxnResponse if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -275,13 +309,13 @@ func TestTxn_Apply_LockDelay(t *testing.T) { // Should acquire. { - var out structs.KVSAtomicResponse + var out structs.TxnResponse if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } if len(out.Results) != 1 || len(out.Errors) != 0 || - out.Results[0].LockIndex != 2 { + out.Results[0].KVS.DirEnt.LockIndex != 2 { t.Fatalf("bad: %v", out) } }