diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index ffed10f9c9..54f843c917 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -226,12 +226,28 @@ func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args return nil, nil } + // Check for cas value + if _, ok := params["cas"]; ok { + casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64) + if err != nil { + return nil, err + } + applyReq.DirEnt.ModifyIndex = casVal + applyReq.Op = structs.KVSDeleteCAS + } + // Make the RPC var out bool if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil { return nil, err } - return nil, nil + + // Only use the out value if this was a CAS + if applyReq.Op == structs.KVSDeleteCAS { + return out, nil + } else { + return true, nil + } } // missingKey checks if the key is missing diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index 0883cfaaac..9e2b174160 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -3,13 +3,14 @@ package agent import ( "bytes" "fmt" - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/testutil" "net/http" "net/http/httptest" "os" "reflect" "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" ) func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { @@ -183,6 +184,93 @@ func TestKVSEndpoint_Recurse(t *testing.T) { } } +func TestKVSEndpoint_DELETE_CAS(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/test", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + req, err := http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d := obj.(structs.DirEntries)[0] + + // Create a CAS request, bad index + { + buf := bytes.NewBuffer([]byte("zip")) + req, err := http.NewRequest("DELETE", + fmt.Sprintf("/v1/kv/test?cas=%d", d.ModifyIndex-1), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); res { + t.Fatalf("should NOT work") + } + } + + // Create a CAS request, good index + { + buf := bytes.NewBuffer([]byte("zip")) + req, err := http.NewRequest("DELETE", + fmt.Sprintf("/v1/kv/test?cas=%d", d.ModifyIndex), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + // Verify the delete + req, _ = http.NewRequest("GET", "/v1/kv/test", nil) + resp = httptest.NewRecorder() + obj, _ = srv.KVSEndpoint(resp, req) + if obj != nil { + t.Fatalf("should be destroyed") + } +} + func TestKVSEndpoint_CAS(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) diff --git a/consul/fsm.go b/consul/fsm.go index 7980b98856..e4fcb2026f 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -149,6 +149,13 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { return c.state.KVSSet(index, &req.DirEnt) case structs.KVSDelete: return c.state.KVSDelete(index, req.DirEnt.Key) + case structs.KVSDeleteCAS: + act, err := c.state.KVSDeleteCheckAndSet(index, req.DirEnt.Key, req.DirEnt.ModifyIndex) + if err != nil { + return err + } else { + return act + } case structs.KVSDeleteTree: return c.state.KVSDeleteTree(index, req.DirEnt.Key) case structs.KVSCAS: diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 280f388d18..d1ec4be72e 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -603,6 +603,66 @@ func TestFSM_KVSDeleteTree(t *testing.T) { } } +func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { + path, err := ioutil.TempDir("", "fsm") + if err != nil { + t.Fatalf("err: %v", err) + } + fsm, err := NewFSM(nil, path, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("key missing") + } + + // Run the check-and-set + req.Op = structs.KVSDeleteCAS + req.DirEnt.ModifyIndex = d.ModifyIndex + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp.(bool) != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is gone + _, d, err = fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("bad: %v", d) + } +} + func TestFSM_KVSCheckAndSet(t *testing.T) { path, err := ioutil.TempDir("", "fsm") if err != nil { diff --git a/consul/state_store.go b/consul/state_store.go index 9cf88b1646..c65152cf69 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1299,6 +1299,42 @@ func (s *StateStore) KVSDelete(index uint64, key string) error { return s.kvsDeleteWithIndex(index, "id", key) } +// KVSDeleteCheckAndSet is used to perform an atomic delete check-and-set +func (s *StateStore) KVSDeleteCheckAndSet(index uint64, key string, casIndex uint64) (bool, error) { + tx, err := s.tables.StartTxn(false) + if err != nil { + return false, err + } + defer tx.Abort() + + // Get the existing node + res, err := s.kvsTable.GetTxn(tx, "id", key) + if err != nil { + return false, err + } + + // Get the existing node if any + var exist *structs.DirEntry + if len(res) > 0 { + exist = res[0].(*structs.DirEntry) + } + + // Use the casIndex as the constraint. A modify time of 0 means + // we are doign a delete-if-not-exists (odd...), while any other + // value means we expect that modify time. + if casIndex == 0 { + return exist == nil, nil + } else if casIndex > 0 && (exist == nil || exist.ModifyIndex != casIndex) { + return false, nil + } + + // Do the actual delete + if err := s.kvsDeleteWithIndexTxn(index, tx, "id", key); err != nil { + return false, err + } + return true, tx.Commit() +} + // KVSDeleteTree is used to delete all keys with a given prefix func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { if prefix == "" { diff --git a/consul/state_store_test.go b/consul/state_store_test.go index b456c19e97..589f7b3781 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1569,6 +1569,56 @@ func TestKVSDelete(t *testing.T) { } } +func TestKVSDeleteCheckAndSet(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // CAS should fail, no entry + ok, err := store.KVSDeleteCheckAndSet(1000, "/foo", 100) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // CAS should work, no entry + ok, err = store.KVSDeleteCheckAndSet(1000, "/foo", 0) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected failure") + } + + // Make an entry + d := &structs.DirEntry{Key: "/foo"} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Constrain on a wrong modify time + ok, err = store.KVSDeleteCheckAndSet(1001, "/foo", 42) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on a correct modify time + ok, err = store.KVSDeleteCheckAndSet(1002, "/foo", 1000) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected commit") + } +} + func TestKVSCheckAndSet(t *testing.T) { store, err := testStateStore() if err != nil { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index ffa44cc50d..0d4778de73 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -330,6 +330,7 @@ type KVSOp string const ( KVSSet KVSOp = "set" KVSDelete = "delete" + KVSDeleteCAS = "delete-cas" // Delete with check-and-set KVSDeleteTree = "delete-tree" KVSCAS = "cas" // Check-and-set KVSLock = "lock" // Lock a key diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index e82abc6527..84efaf5246 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -212,9 +212,17 @@ then the update has not taken place. ### DELETE method Lastly, the `DELETE` method can be used to delete a single key or all -keys sharing a prefix. If the "?recurse" query parameter is provided, -then all keys with the prefix are deleted, otherwise only the specified -key. +keys sharing a prefix. There are a number of patameters that can +be used with a DELETE request: + +* ?recurse : This is used to delete all keys which have the specified prefix. + Without this, only a key with an exact match will be deleted. + +* ?cas=\ : This flag is used to turn the `DELETE` into a Check-And-Set + operation. This is very useful as it allows clients to build more complex + synchronization primitives on top. If the index is 0, then Consul will only + delete the key if it does not already exist (noop). If the index is non-zero, then + the key is only deleted if the index matches the `ModifyIndex` of that key. ## Agent