From 6131fad0686a6971308651a77d4e67988bb0f0c4 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 May 2014 16:14:03 -0700 Subject: [PATCH] agent: Adding locking support to KV store --- command/agent/kvs_endpoint.go | 12 +++++ command/agent/kvs_endpoint_test.go | 70 ++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go index 7791d1e9f3..2adaae3528 100644 --- a/command/agent/kvs_endpoint.go +++ b/command/agent/kvs_endpoint.go @@ -156,6 +156,18 @@ func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *s applyReq.Op = structs.KVSCAS } + // Check for lock acquisition + if _, ok := params["acquire"]; ok { + applyReq.DirEnt.Session = params.Get("acquire") + applyReq.Op = structs.KVSLock + } + + // Check for lock release + if _, ok := params["release"]; ok { + applyReq.DirEnt.Session = params.Get("release") + applyReq.Op = structs.KVSUnlock + } + // Check the content-length if req.ContentLength > maxKVSize { resp.WriteHeader(413) diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index 24ec51372f..867b2a01f4 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -339,3 +339,73 @@ func TestKVSEndpoint_ListKeys(t *testing.T) { } } } + +func TestKVSEndpoint_AcquireRelease(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + // Acquire the lock + id := makeTestSession(t, srv) + req, err := http.NewRequest("PUT", + "/v1/kv/test?acquire="+id, bytes.NewReader(nil)) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if res := obj.(bool); !res { + t.Fatalf("should work") + } + + // Verify we have the lock + req, err = http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d := obj.(structs.DirEntries)[0] + + // Check the flags + if d.Session != id { + t.Fatalf("bad: %v", d) + } + + // Release the lock + req, err = http.NewRequest("PUT", + "/v1/kv/test?release="+id, bytes.NewReader(nil)) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if res := obj.(bool); !res { + t.Fatalf("should work") + } + + // Verify we do not have the lock + req, err = http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d = obj.(structs.DirEntries)[0] + + // Check the flags + if d.Session != "" { + t.Fatalf("bad: %v", d) + } + }) +}