mirror of https://github.com/status-im/consul.git
Add check operations to transaction api
This commit is contained in:
parent
43d882c38e
commit
b371ea8783
|
@ -1250,6 +1250,36 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID
|
|||
return nil
|
||||
}
|
||||
|
||||
// ensureCheckCASTxn updates a check only if the existing index matches the given index.
|
||||
// Returns a bool indicating if a write happened and any error.
|
||||
func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) (bool, error) {
|
||||
// Retrieve the existing entry.
|
||||
existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed health check lookup: %s", err)
|
||||
}
|
||||
|
||||
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||
// we are doing a set-if-not-exists.
|
||||
if hc.ModifyIndex == 0 && existing != nil {
|
||||
return false, nil
|
||||
}
|
||||
if hc.ModifyIndex != 0 && existing == nil {
|
||||
return false, nil
|
||||
}
|
||||
e, ok := existing.(*structs.HealthCheck)
|
||||
if ok && hc.ModifyIndex != 0 && hc.ModifyIndex != e.ModifyIndex {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Perform the update.
|
||||
if err := s.ensureCheckTxn(tx, idx, hc); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ensureCheckTransaction is used as the inner method to handle inserting
|
||||
// a health check into the state store. It ensures safety against inserting
|
||||
// checks with no matching node or service.
|
||||
|
@ -1366,6 +1396,12 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *stru
|
|||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return s.nodeCheckTxn(tx, nodeName, checkID)
|
||||
}
|
||||
|
||||
// nodeCheckTxn is used as the inner method to handle reading a health check
|
||||
// from the state store.
|
||||
func (s *Store) nodeCheckTxn(tx *memdb.Txn, nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "checks")
|
||||
|
||||
|
@ -1555,6 +1591,35 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
// deleteCheckCASTxn is used to try doing a check delete operation with a given
|
||||
// raft index. If the CAS index specified is not equal to the last bserved index for
|
||||
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
|
||||
func (s *Store) deleteCheckCASTxn(tx *memdb.Txn, idx, cidx uint64, node string, checkID types.CheckID) (bool, error) {
|
||||
// Try to retrieve the existing health check.
|
||||
hc, err := tx.First("checks", "id", node, string(checkID))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("check lookup failed: %s", err)
|
||||
}
|
||||
if hc == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If the existing index does not match the provided CAS
|
||||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
existing, ok := hc.(*structs.HealthCheck)
|
||||
if !ok || existing.ModifyIndex != cidx {
|
||||
return existing == nil, nil
|
||||
}
|
||||
|
||||
// Call the actual deletion if the above passed.
|
||||
if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// deleteCheckTxn is the inner method used to call a health
|
||||
// check deletion within an existing transaction.
|
||||
func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
|
||||
|
|
|
@ -118,10 +118,68 @@ func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntention
|
|||
case structs.IntentionOpDelete:
|
||||
return s.intentionDeleteTxn(tx, idx, op.Intention.ID)
|
||||
default:
|
||||
return fmt.Errorf("unknown Intention verb %q", op.Op)
|
||||
return fmt.Errorf("unknown Intention op %q", op.Op)
|
||||
}
|
||||
}
|
||||
|
||||
// txnCheck handles all Check-related operations.
|
||||
func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
|
||||
var entry *structs.HealthCheck
|
||||
var err error
|
||||
|
||||
switch op.Verb {
|
||||
case api.CheckGet:
|
||||
_, entry, err = s.nodeCheckTxn(tx, op.Check.Node, op.Check.CheckID)
|
||||
if entry == nil && err == nil {
|
||||
err = fmt.Errorf("check %q on node %q doesn't exist", op.Check.CheckID, op.Check.Node)
|
||||
}
|
||||
|
||||
case api.CheckSet:
|
||||
entry = &op.Check
|
||||
err = s.ensureCheckTxn(tx, idx, entry)
|
||||
|
||||
case api.CheckCAS:
|
||||
var ok bool
|
||||
entry = &op.Check
|
||||
ok, err = s.ensureCheckCASTxn(tx, idx, entry)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to set check %q on node %q, index is stale", entry.CheckID, entry.Node)
|
||||
}
|
||||
|
||||
case api.CheckDelete:
|
||||
err = s.deleteCheckTxn(tx, idx, op.Check.Node, op.Check.CheckID)
|
||||
|
||||
case api.CheckDeleteCAS:
|
||||
var ok bool
|
||||
ok, err = s.deleteCheckCASTxn(tx, idx, op.Check.ModifyIndex, op.Check.Node, op.Check.CheckID)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to delete check %q on node %q, index is stale", op.Check.CheckID, op.Check.Node)
|
||||
}
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown Check verb %q", op.Verb)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For a GET we keep the value, otherwise we clone and blank out the
|
||||
// value (we have to clone so we don't modify the entry being used by
|
||||
// the state store).
|
||||
if entry != nil {
|
||||
if op.Verb == api.CheckGet {
|
||||
result := structs.TxnResult{Check: entry}
|
||||
return structs.TxnResults{&result}, nil
|
||||
}
|
||||
|
||||
clone := entry.Clone()
|
||||
result := structs.TxnResult{Check: clone}
|
||||
return structs.TxnResults{&result}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// txnDispatch runs the given operations inside the state store transaction.
|
||||
func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
||||
results := make(structs.TxnResults, 0, len(ops))
|
||||
|
|
|
@ -916,7 +916,8 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool {
|
|||
c.Output != other.Output ||
|
||||
c.ServiceID != other.ServiceID ||
|
||||
c.ServiceName != other.ServiceName ||
|
||||
!reflect.DeepEqual(c.ServiceTags, other.ServiceTags) {
|
||||
!reflect.DeepEqual(c.ServiceTags, other.ServiceTags) ||
|
||||
!reflect.DeepEqual(c.Definition, other.Definition) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
)
|
||||
|
||||
// TxnKVOp is used to define a single operation on the KVS inside a
|
||||
// transaction
|
||||
// transaction.
|
||||
type TxnKVOp struct {
|
||||
Verb api.KVOp
|
||||
DirEnt DirEntry
|
||||
|
@ -19,6 +19,17 @@ type TxnKVOp struct {
|
|||
// inside a transaction.
|
||||
type TxnKVResult *DirEntry
|
||||
|
||||
// TxnCheckOp is used to define a single operation on a health check inside a
|
||||
// transaction.
|
||||
type TxnCheckOp struct {
|
||||
Verb api.CheckOp
|
||||
Check HealthCheck
|
||||
}
|
||||
|
||||
// TxnCheckResult is used to define the result of a single operation on a health
|
||||
// check inside a transaction.
|
||||
type TxnCheckResult *HealthCheck
|
||||
|
||||
// TxnKVOp is used to define a single operation on an Intention inside a
|
||||
// transaction.
|
||||
type TxnIntentionOp IntentionRequest
|
||||
|
@ -28,6 +39,7 @@ type TxnIntentionOp IntentionRequest
|
|||
type TxnOp struct {
|
||||
KV *TxnKVOp
|
||||
Intention *TxnIntentionOp
|
||||
Check *TxnCheckOp
|
||||
}
|
||||
|
||||
// TxnOps is a list of operations within a transaction.
|
||||
|
@ -75,7 +87,8 @@ type TxnErrors []*TxnError
|
|||
// TxnResult is used to define the result of a given operation inside a
|
||||
// transaction. Only one of the types should be filled out per entry.
|
||||
type TxnResult struct {
|
||||
KV TxnKVResult
|
||||
KV TxnKVResult
|
||||
Check TxnCheckResult
|
||||
}
|
||||
|
||||
// TxnResults is a list of TxnResult entries.
|
||||
|
|
160
api/kv.go
160
api/kv.go
|
@ -45,44 +45,6 @@ type KVPair struct {
|
|||
// KVPairs is a list of KVPair objects
|
||||
type KVPairs []*KVPair
|
||||
|
||||
// KVOp constants give possible operations available in a KVTxn.
|
||||
type KVOp string
|
||||
|
||||
const (
|
||||
KVSet KVOp = "set"
|
||||
KVDelete KVOp = "delete"
|
||||
KVDeleteCAS KVOp = "delete-cas"
|
||||
KVDeleteTree KVOp = "delete-tree"
|
||||
KVCAS KVOp = "cas"
|
||||
KVLock KVOp = "lock"
|
||||
KVUnlock KVOp = "unlock"
|
||||
KVGet KVOp = "get"
|
||||
KVGetTree KVOp = "get-tree"
|
||||
KVCheckSession KVOp = "check-session"
|
||||
KVCheckIndex KVOp = "check-index"
|
||||
KVCheckNotExists KVOp = "check-not-exists"
|
||||
)
|
||||
|
||||
// KVTxnOp defines a single operation inside a transaction.
|
||||
type KVTxnOp struct {
|
||||
Verb KVOp
|
||||
Key string
|
||||
Value []byte
|
||||
Flags uint64
|
||||
Index uint64
|
||||
Session string
|
||||
}
|
||||
|
||||
// KVTxnOps defines a set of operations to be performed inside a single
|
||||
// transaction.
|
||||
type KVTxnOps []*KVTxnOp
|
||||
|
||||
// KVTxnResponse has the outcome of a transaction.
|
||||
type KVTxnResponse struct {
|
||||
Results []*KVPair
|
||||
Errors TxnErrors
|
||||
}
|
||||
|
||||
// KV is used to manipulate the K/V API
|
||||
type KV struct {
|
||||
c *Client
|
||||
|
@ -300,121 +262,25 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
|
|||
return res, qm, nil
|
||||
}
|
||||
|
||||
// TxnOp is the internal format we send to Consul. It's not specific to KV,
|
||||
// though currently only KV operations are supported.
|
||||
type TxnOp struct {
|
||||
KV *KVTxnOp
|
||||
}
|
||||
|
||||
// TxnOps is a list of transaction operations.
|
||||
type TxnOps []*TxnOp
|
||||
|
||||
// TxnResult is the internal format we receive from Consul.
|
||||
type TxnResult struct {
|
||||
KV *KVPair
|
||||
}
|
||||
|
||||
// TxnResults is a list of TxnResult objects.
|
||||
type TxnResults []*TxnResult
|
||||
|
||||
// TxnError is used to return information about an operation in a transaction.
|
||||
type TxnError struct {
|
||||
OpIndex int
|
||||
What string
|
||||
}
|
||||
|
||||
// TxnErrors is a list of TxnError objects.
|
||||
type TxnErrors []*TxnError
|
||||
|
||||
// TxnResponse is the internal format we receive from Consul.
|
||||
type TxnResponse struct {
|
||||
Results TxnResults
|
||||
Errors TxnErrors
|
||||
}
|
||||
|
||||
// Txn is used to apply multiple KV operations in a single, atomic transaction.
|
||||
//
|
||||
// Note that Go will perform the required base64 encoding on the values
|
||||
// automatically because the type is a byte slice. Transactions are defined as a
|
||||
// list of operations to perform, using the KVOp constants and KVTxnOp structure
|
||||
// to define operations. If any operation fails, none of the changes are applied
|
||||
// to the state store. Note that this hides the internal raw transaction interface
|
||||
// and munges the input and output types into KV-specific ones for ease of use.
|
||||
// If there are more non-KV operations in the future we may break out a new
|
||||
// transaction API client, but it will be easy to keep this KV-specific variant
|
||||
// supported.
|
||||
//
|
||||
// Even though this is generally a write operation, we take a QueryOptions input
|
||||
// and return a QueryMeta output. If the transaction contains only read ops, then
|
||||
// Consul will fast-path it to a different endpoint internally which supports
|
||||
// consistency controls, but not blocking. If there are write operations then
|
||||
// the request will always be routed through raft and any consistency settings
|
||||
// will be ignored.
|
||||
//
|
||||
// Here's an example:
|
||||
//
|
||||
// ops := KVTxnOps{
|
||||
// &KVTxnOp{
|
||||
// Verb: KVLock,
|
||||
// Key: "test/lock",
|
||||
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||||
// Value: []byte("hello"),
|
||||
// },
|
||||
// &KVTxnOp{
|
||||
// Verb: KVGet,
|
||||
// Key: "another/key",
|
||||
// },
|
||||
// }
|
||||
// ok, response, _, err := kv.Txn(&ops, nil)
|
||||
//
|
||||
// If there is a problem making the transaction request then an error will be
|
||||
// returned. Otherwise, the ok value will be true if the transaction succeeded
|
||||
// or false if it was rolled back. The response is a structured return value which
|
||||
// will have the outcome of the transaction. Its Results member will have entries
|
||||
// for each operation. Deleted keys will have a nil entry in the, and to save
|
||||
// space, the Value of each key in the Results will be nil unless the operation
|
||||
// is a KVGet. If the transaction was rolled back, the Errors member will have
|
||||
// entries referencing the index of the operation that failed along with an error
|
||||
// message.
|
||||
// The Txn function has been deprecated from the KV object; please see the Txn
|
||||
// object for more information about Transactions.
|
||||
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
|
||||
r := k.c.newRequest("PUT", "/v1/txn")
|
||||
r.setQueryOptions(q)
|
||||
|
||||
// Convert into the internal format since this is an all-KV txn.
|
||||
ops := make(TxnOps, 0, len(txn))
|
||||
for _, kvOp := range txn {
|
||||
ops = append(ops, &TxnOp{KV: kvOp})
|
||||
ops := make(TxnOps, len(txn))
|
||||
for _, op := range txn {
|
||||
ops = append(ops, &TxnOp{KV: op})
|
||||
}
|
||||
r.obj = ops
|
||||
rtt, resp, err := k.c.doRequest(r)
|
||||
|
||||
respOk, txnResp, qm, err := k.c.txn(ops, q)
|
||||
if err != nil {
|
||||
return false, nil, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
|
||||
var txnResp TxnResponse
|
||||
if err := decodeBody(resp, &txnResp); err != nil {
|
||||
return false, nil, nil, err
|
||||
}
|
||||
|
||||
// Convert from the internal format.
|
||||
kvResp := KVTxnResponse{
|
||||
Errors: txnResp.Errors,
|
||||
}
|
||||
for _, result := range txnResp.Results {
|
||||
kvResp.Results = append(kvResp.Results, result.KV)
|
||||
}
|
||||
return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
|
||||
// Convert from the internal format.
|
||||
kvResp := KVTxnResponse{
|
||||
Errors: txnResp.Errors,
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
|
||||
for _, result := range txnResp.Results {
|
||||
kvResp.Results = append(kvResp.Results, result.KV)
|
||||
}
|
||||
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
|
||||
return respOk, &kvResp, qm, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// Txn is used to manipulate the Txn API
|
||||
type Txn struct {
|
||||
c *Client
|
||||
}
|
||||
|
||||
// Txn is used to return a handle to the K/V apis
|
||||
func (c *Client) Txn() *Txn {
|
||||
return &Txn{c}
|
||||
}
|
||||
|
||||
// TxnOp is the internal format we send to Consul. Currently only K/V and
|
||||
// check operations are supported.
|
||||
type TxnOp struct {
|
||||
KV *KVTxnOp
|
||||
Check *CheckTxnOp
|
||||
}
|
||||
|
||||
// TxnOps is a list of transaction operations.
|
||||
type TxnOps []*TxnOp
|
||||
|
||||
// TxnResult is the internal format we receive from Consul.
|
||||
type TxnResult struct {
|
||||
KV *KVPair
|
||||
Check *HealthCheck
|
||||
}
|
||||
|
||||
// TxnResults is a list of TxnResult objects.
|
||||
type TxnResults []*TxnResult
|
||||
|
||||
// TxnError is used to return information about an operation in a transaction.
|
||||
type TxnError struct {
|
||||
OpIndex int
|
||||
What string
|
||||
}
|
||||
|
||||
// TxnErrors is a list of TxnError objects.
|
||||
type TxnErrors []*TxnError
|
||||
|
||||
// TxnResponse is the internal format we receive from Consul.
|
||||
type TxnResponse struct {
|
||||
Results TxnResults
|
||||
Errors TxnErrors
|
||||
}
|
||||
|
||||
// KVOp constants give possible operations available in a transaction.
|
||||
type KVOp string
|
||||
|
||||
const (
|
||||
KVSet KVOp = "set"
|
||||
KVDelete KVOp = "delete"
|
||||
KVDeleteCAS KVOp = "delete-cas"
|
||||
KVDeleteTree KVOp = "delete-tree"
|
||||
KVCAS KVOp = "cas"
|
||||
KVLock KVOp = "lock"
|
||||
KVUnlock KVOp = "unlock"
|
||||
KVGet KVOp = "get"
|
||||
KVGetTree KVOp = "get-tree"
|
||||
KVCheckSession KVOp = "check-session"
|
||||
KVCheckIndex KVOp = "check-index"
|
||||
KVCheckNotExists KVOp = "check-not-exists"
|
||||
)
|
||||
|
||||
// KVTxnOp defines a single operation inside a transaction.
|
||||
type KVTxnOp struct {
|
||||
Verb KVOp
|
||||
Key string
|
||||
Value []byte
|
||||
Flags uint64
|
||||
Index uint64
|
||||
Session string
|
||||
}
|
||||
|
||||
// KVTxnOps defines a set of operations to be performed inside a single
|
||||
// transaction.
|
||||
type KVTxnOps []*KVTxnOp
|
||||
|
||||
// KVTxnResponse has the outcome of a transaction.
|
||||
type KVTxnResponse struct {
|
||||
Results []*KVPair
|
||||
Errors TxnErrors
|
||||
}
|
||||
|
||||
// CheckOp constants give possible operations available in a transaction.
|
||||
type CheckOp string
|
||||
|
||||
const (
|
||||
CheckGet CheckOp = "get"
|
||||
CheckSet CheckOp = "set"
|
||||
CheckCAS CheckOp = "cas"
|
||||
CheckDelete CheckOp = "delete"
|
||||
CheckDeleteCAS CheckOp = "delete-cas"
|
||||
)
|
||||
|
||||
// CheckTxnOp defines a single operation inside a transaction.
|
||||
type CheckTxnOp struct {
|
||||
Verb CheckOp
|
||||
Check HealthCheck
|
||||
}
|
||||
|
||||
// Txn is used to apply multiple Consul operations in a single, atomic transaction.
|
||||
//
|
||||
// Note that Go will perform the required base64 encoding on the values
|
||||
// automatically because the type is a byte slice. Transactions are defined as a
|
||||
// list of operations to perform, using the different fields in the TxnOp structure
|
||||
// to define operations. If any operation fails, none of the changes are applied
|
||||
// to the state store.
|
||||
//
|
||||
// Even though this is generally a write operation, we take a QueryOptions input
|
||||
// and return a QueryMeta output. If the transaction contains only read ops, then
|
||||
// Consul will fast-path it to a different endpoint internally which supports
|
||||
// consistency controls, but not blocking. If there are write operations then
|
||||
// the request will always be routed through raft and any consistency settings
|
||||
// will be ignored.
|
||||
//
|
||||
// Here's an example:
|
||||
//
|
||||
// ops := KVTxnOps{
|
||||
// &KVTxnOp{
|
||||
// Verb: KVLock,
|
||||
// Key: "test/lock",
|
||||
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||||
// Value: []byte("hello"),
|
||||
// },
|
||||
// &KVTxnOp{
|
||||
// Verb: KVGet,
|
||||
// Key: "another/key",
|
||||
// },
|
||||
// &CheckTxnOp{
|
||||
// Verb: CheckSet,
|
||||
// HealthCheck: HealthCheck{
|
||||
// Node: "foo",
|
||||
// CheckID: "redis:a",
|
||||
// Name: "Redis Health Check",
|
||||
// Status: "passing",
|
||||
// },
|
||||
// }
|
||||
// }
|
||||
// ok, response, _, err := kv.Txn(&ops, nil)
|
||||
//
|
||||
// If there is a problem making the transaction request then an error will be
|
||||
// returned. Otherwise, the ok value will be true if the transaction succeeded
|
||||
// or false if it was rolled back. The response is a structured return value which
|
||||
// will have the outcome of the transaction. Its Results member will have entries
|
||||
// for each operation. For KV operations, Deleted keys will have a nil entry in the
|
||||
// results, and to save space, the Value of each key in the Results will be nil
|
||||
// unless the operation is a KVGet. If the transaction was rolled back, the Errors
|
||||
// member will have entries referencing the index of the operation that failed
|
||||
// along with an error message.
|
||||
func (t *Txn) Txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) {
|
||||
return t.c.txn(txn, q)
|
||||
}
|
||||
|
||||
func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) {
|
||||
r := c.newRequest("PUT", "/v1/txn")
|
||||
r.setQueryOptions(q)
|
||||
|
||||
// Convert into the internal txn format.
|
||||
ops := make(TxnOps, 0, len(txn))
|
||||
for _, kvOp := range txn {
|
||||
switch {
|
||||
case kvOp.KV != nil:
|
||||
ops = append(ops, &TxnOp{KV: kvOp.KV})
|
||||
case kvOp.Check != nil:
|
||||
ops = append(ops, &TxnOp{Check: kvOp.Check})
|
||||
}
|
||||
}
|
||||
r.obj = ops
|
||||
rtt, resp, err := c.doRequest(r)
|
||||
if err != nil {
|
||||
return false, nil, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
|
||||
var txnResp TxnResponse
|
||||
if err := decodeBody(resp, &txnResp); err != nil {
|
||||
return false, nil, nil, err
|
||||
}
|
||||
|
||||
return resp.StatusCode == http.StatusOK, &txnResp, qm, nil
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
|
||||
}
|
||||
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
|
||||
}
|
Loading…
Reference in New Issue