mirror of https://github.com/status-im/consul.git
Merge pull request #589 from hashicorp/f-delete-cas
Add support for DELETE with CAS
This commit is contained in:
commit
8b320b852e
|
@ -226,12 +226,28 @@ func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Check for cas value
|
||||
if _, ok := params["cas"]; ok {
|
||||
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
applyReq.DirEnt.ModifyIndex = casVal
|
||||
applyReq.Op = structs.KVSDeleteCAS
|
||||
}
|
||||
|
||||
// Make the RPC
|
||||
var out bool
|
||||
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
// Only use the out value if this was a CAS
|
||||
if applyReq.Op == structs.KVSDeleteCAS {
|
||||
return out, nil
|
||||
} else {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// missingKey checks if the key is missing
|
||||
|
|
|
@ -3,13 +3,14 @@ package agent
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
|
||||
|
@ -183,6 +184,93 @@ func TestKVSEndpoint_Recurse(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKVSEndpoint_DELETE_CAS(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte("test"))
|
||||
req, err := http.NewRequest("PUT", "/v1/kv/test", buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/kv/test", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d := obj.(structs.DirEntries)[0]
|
||||
|
||||
// Create a CAS request, bad index
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte("zip"))
|
||||
req, err := http.NewRequest("DELETE",
|
||||
fmt.Sprintf("/v1/kv/test?cas=%d", d.ModifyIndex-1), buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); res {
|
||||
t.Fatalf("should NOT work")
|
||||
}
|
||||
}
|
||||
|
||||
// Create a CAS request, good index
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte("zip"))
|
||||
req, err := http.NewRequest("DELETE",
|
||||
fmt.Sprintf("/v1/kv/test?cas=%d", d.ModifyIndex), buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the delete
|
||||
req, _ = http.NewRequest("GET", "/v1/kv/test", nil)
|
||||
resp = httptest.NewRecorder()
|
||||
obj, _ = srv.KVSEndpoint(resp, req)
|
||||
if obj != nil {
|
||||
t.Fatalf("should be destroyed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSEndpoint_CAS(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
|
|
@ -149,6 +149,13 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
|||
return c.state.KVSSet(index, &req.DirEnt)
|
||||
case structs.KVSDelete:
|
||||
return c.state.KVSDelete(index, req.DirEnt.Key)
|
||||
case structs.KVSDeleteCAS:
|
||||
act, err := c.state.KVSDeleteCheckAndSet(index, req.DirEnt.Key, req.DirEnt.ModifyIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
case structs.KVSDeleteTree:
|
||||
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
||||
case structs.KVSCAS:
|
||||
|
|
|
@ -603,6 +603,66 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
||||
path, err := ioutil.TempDir("", "fsm")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer fsm.Close()
|
||||
|
||||
req := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("key missing")
|
||||
}
|
||||
|
||||
// Run the check-and-set
|
||||
req.Op = structs.KVSDeleteCAS
|
||||
req.DirEnt.ModifyIndex = d.ModifyIndex
|
||||
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp.(bool) != true {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is gone
|
||||
_, d, err = fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d != nil {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSCheckAndSet(t *testing.T) {
|
||||
path, err := ioutil.TempDir("", "fsm")
|
||||
if err != nil {
|
||||
|
|
|
@ -1299,6 +1299,42 @@ func (s *StateStore) KVSDelete(index uint64, key string) error {
|
|||
return s.kvsDeleteWithIndex(index, "id", key)
|
||||
}
|
||||
|
||||
// KVSDeleteCheckAndSet is used to perform an atomic delete check-and-set
|
||||
func (s *StateStore) KVSDeleteCheckAndSet(index uint64, key string, casIndex uint64) (bool, error) {
|
||||
tx, err := s.tables.StartTxn(false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the existing node
|
||||
res, err := s.kvsTable.GetTxn(tx, "id", key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Get the existing node if any
|
||||
var exist *structs.DirEntry
|
||||
if len(res) > 0 {
|
||||
exist = res[0].(*structs.DirEntry)
|
||||
}
|
||||
|
||||
// Use the casIndex as the constraint. A modify time of 0 means
|
||||
// we are doign a delete-if-not-exists (odd...), while any other
|
||||
// value means we expect that modify time.
|
||||
if casIndex == 0 {
|
||||
return exist == nil, nil
|
||||
} else if casIndex > 0 && (exist == nil || exist.ModifyIndex != casIndex) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Do the actual delete
|
||||
if err := s.kvsDeleteWithIndexTxn(index, tx, "id", key); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, tx.Commit()
|
||||
}
|
||||
|
||||
// KVSDeleteTree is used to delete all keys with a given prefix
|
||||
func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
|
||||
if prefix == "" {
|
||||
|
|
|
@ -1569,6 +1569,56 @@ func TestKVSDelete(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKVSDeleteCheckAndSet(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// CAS should fail, no entry
|
||||
ok, err := store.KVSDeleteCheckAndSet(1000, "/foo", 100)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// CAS should work, no entry
|
||||
ok, err = store.KVSDeleteCheckAndSet(1000, "/foo", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("unexpected failure")
|
||||
}
|
||||
|
||||
// Make an entry
|
||||
d := &structs.DirEntry{Key: "/foo"}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Constrain on a wrong modify time
|
||||
ok, err = store.KVSDeleteCheckAndSet(1001, "/foo", 42)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on a correct modify time
|
||||
ok, err = store.KVSDeleteCheckAndSet(1002, "/foo", 1000)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("expected commit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSCheckAndSet(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
|
|
|
@ -330,6 +330,7 @@ type KVSOp string
|
|||
const (
|
||||
KVSSet KVSOp = "set"
|
||||
KVSDelete = "delete"
|
||||
KVSDeleteCAS = "delete-cas" // Delete with check-and-set
|
||||
KVSDeleteTree = "delete-tree"
|
||||
KVSCAS = "cas" // Check-and-set
|
||||
KVSLock = "lock" // Lock a key
|
||||
|
|
|
@ -212,9 +212,17 @@ then the update has not taken place.
|
|||
### DELETE method
|
||||
|
||||
Lastly, the `DELETE` method can be used to delete a single key or all
|
||||
keys sharing a prefix. If the "?recurse" query parameter is provided,
|
||||
then all keys with the prefix are deleted, otherwise only the specified
|
||||
key.
|
||||
keys sharing a prefix. There are a number of patameters that can
|
||||
be used with a DELETE request:
|
||||
|
||||
* ?recurse : This is used to delete all keys which have the specified prefix.
|
||||
Without this, only a key with an exact match will be deleted.
|
||||
|
||||
* ?cas=\<index\> : This flag is used to turn the `DELETE` into a Check-And-Set
|
||||
operation. This is very useful as it allows clients to build more complex
|
||||
synchronization primitives on top. If the index is 0, then Consul will only
|
||||
delete the key if it does not already exist (noop). If the index is non-zero, then
|
||||
the key is only deleted if the index matches the `ModifyIndex` of that key.
|
||||
|
||||
## <a name="agent"></a> Agent
|
||||
|
||||
|
|
Loading…
Reference in New Issue