From 94d7022a884c3831db4cba26435252478f6e8199 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 13:57:48 -0800 Subject: [PATCH 1/3] api: Add support for DeleteCAS --- api/kv.go | 33 +++++++++++++++++++++++++-------- api/kv_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/api/kv.go b/api/kv.go index 4b3ed0640f..ba74057fcc 100644 --- a/api/kv.go +++ b/api/kv.go @@ -193,27 +193,44 @@ func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOpti // Delete is used to delete a single key func (k *KV) Delete(key string, w *WriteOptions) (*WriteMeta, error) { - return k.deleteInternal(key, nil, w) + _, qm, err := k.deleteInternal(key, nil, w) + return qm, err +} + +// DeleteCAS is used for a Delete Check-And-Set operation. The Key +// and ModifyIndex are respected. Returns true on success or false on failures. +func (k *KV) DeleteCAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) { + params := map[string]string{ + "cas": strconv.FormatUint(p.ModifyIndex, 10), + } + return k.deleteInternal(p.Key, params, q) } // DeleteTree is used to delete all keys under a prefix func (k *KV) DeleteTree(prefix string, w *WriteOptions) (*WriteMeta, error) { - return k.deleteInternal(prefix, []string{"recurse"}, w) + _, qm, err := k.deleteInternal(prefix, map[string]string{"recurse": ""}, w) + return qm, err } -func (k *KV) deleteInternal(key string, params []string, q *WriteOptions) (*WriteMeta, error) { +func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOptions) (bool, *WriteMeta, error) { r := k.c.newRequest("DELETE", "/v1/kv/"+key) r.setWriteOptions(q) - for _, param := range params { - r.params.Set(param, "") + for param, val := range params { + r.params.Set(param, val) } rtt, resp, err := requireOK(k.c.doRequest(r)) if err != nil { - return nil, err + return false, nil, err } - resp.Body.Close() + defer resp.Body.Close() qm := &WriteMeta{} qm.RequestTime = rtt - return qm, nil + + var buf bytes.Buffer + if _, err := io.Copy(&buf, resp.Body); err != nil { + return false, nil, fmt.Errorf("Failed to read response: %v", err) + } + res := strings.Contains(string(buf.Bytes()), "true") + return res, qm, nil } diff --git a/api/kv_test.go b/api/kv_test.go index 1ed751320a..8f2b54945d 100644 --- a/api/kv_test.go +++ b/api/kv_test.go @@ -117,6 +117,51 @@ func TestClient_List_DeleteRecurse(t *testing.T) { } } +func TestClient_DeleteCAS(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + kv := c.KV() + + // Put the key + key := testKey() + value := []byte("test") + p := &KVPair{Key: key, Value: value} + if work, _, err := kv.CAS(p, nil); err != nil { + t.Fatalf("err: %v", err) + } else if !work { + t.Fatalf("CAS failure") + } + + // Get should work + pair, meta, err := kv.Get(key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if pair == nil { + t.Fatalf("expected value: %#v", pair) + } + if meta.LastIndex == 0 { + t.Fatalf("unexpected value: %#v", meta) + } + + // CAS update with bad index + p.ModifyIndex = 1 + if work, _, err := kv.DeleteCAS(p, nil); err != nil { + t.Fatalf("err: %v", err) + } else if work { + t.Fatalf("unexpected CAS") + } + + // CAS update with valid index + p.ModifyIndex = meta.LastIndex + if work, _, err := kv.DeleteCAS(p, nil); err != nil { + t.Fatalf("err: %v", err) + } else if !work { + t.Fatalf("unexpected CAS failure") + } +} + func TestClient_CAS(t *testing.T) { c, s := makeClient(t) defer s.stop() From 9608108e64253e684decaf1b40feb29ce2e1ebde Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 14:01:50 -0800 Subject: [PATCH 2/3] api: Adding Destroy to cleanup a lock --- api/lock.go | 44 +++++++++++++++++++++++++++++++ api/lock_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/api/lock.go b/api/lock.go index 5885702465..837304d5ff 100644 --- a/api/lock.go +++ b/api/lock.go @@ -33,6 +33,10 @@ var ( // ErrLockNotHeld is returned if we attempt to unlock a lock // that we do not hold. ErrLockNotHeld = fmt.Errorf("Lock not held") + + // ErrLockInUse is returned if we attempt to destroy a lock + // that is in use. + ErrLockInUse = fmt.Errorf("Lock in use") ) // Lock is used to implement client-side leader election. It is follows the @@ -217,6 +221,46 @@ func (l *Lock) Unlock() error { return nil } +// Destroy is used to cleanup the lock entry. It is not necessary +// to invoke. It will fail if the lock is in use. +func (l *Lock) Destroy() error { + // Hold the lock as we try to release + l.l.Lock() + defer l.l.Unlock() + + // Check if we already hold the lock + if l.isHeld { + return ErrLockHeld + } + + // Look for an existing lock + kv := l.c.KV() + pair, _, err := kv.Get(l.opts.Key, nil) + if err != nil { + return fmt.Errorf("failed to read lock: %v", err) + } + + // Nothing to do if the lock does not exist + if pair == nil { + return nil + } + + // Check if it is in use + if pair.Session != "" { + return ErrLockInUse + } + + // Attempt the delete + didRemove, _, err := kv.DeleteCAS(pair, nil) + if err != nil { + return fmt.Errorf("failed to remove lock: %v", err) + } + if !didRemove { + return ErrLockInUse + } + return nil +} + // createSession is used to create a new managed session func (l *Lock) createSession() (string, error) { session := l.c.Session() diff --git a/api/lock_test.go b/api/lock_test.go index 5dba6ca225..8b849a8b73 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -182,3 +182,71 @@ func TestLock_Contend(t *testing.T) { } } } + +func TestLock_Destroy(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + lock, err := c.LockKey("test/lock") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("not leader") + } + + // Destroy should fail + if err := lock.Destroy(); err != ErrLockHeld { + t.Fatalf("err: %v", err) + } + + // Should be able to release + err = lock.Unlock() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Acquire with a different lock + l2, err := c.LockKey("test/lock") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + leaderCh, err = l2.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("not leader") + } + + // Destroy should still fail + if err := lock.Destroy(); err != ErrLockInUse { + t.Fatalf("err: %v", err) + } + + // Should relese + err = l2.Unlock() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should work + err = lock.Destroy() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Double destroy should work + err = l2.Destroy() + if err != nil { + t.Fatalf("err: %v", err) + } +} From 7ed1449b6cd6819ffb8269ba4902bbf5fddd66e3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 14:18:28 -0800 Subject: [PATCH 3/3] api: Adding Destroy to cleanup a semaphore --- api/semaphore.go | 56 ++++++++++++++++++++++++++++++++++++++++++- api/semaphore_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/api/semaphore.go b/api/semaphore.go index c1c467a957..17b6396c34 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -36,9 +36,13 @@ var ( // ErrSemaphoreHeld is returned if we attempt to double lock ErrSemaphoreHeld = fmt.Errorf("Semaphore already held") - // ErrSemaphoreNotHeld is returned if we attempt to unlock a lock + // ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore // that we do not hold. ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held") + + // ErrSemaphoreInUse is returned if we attempt to destroy a semaphore + // that is in use. + ErrSemaphoreInUse = fmt.Errorf("Semaphore in use") ) // Semaphore is used to implement a distributed semaphore @@ -300,6 +304,56 @@ READ: return nil } +// Destroy is used to cleanup the semaphore entry. It is not necessary +// to invoke. It will fail if the semaphore is in use. +func (s *Semaphore) Destroy() error { + // Hold the lock as we try to acquire + s.l.Lock() + defer s.l.Unlock() + + // Check if we already hold the semaphore + if s.isHeld { + return ErrSemaphoreHeld + } + + // List for the semaphore + kv := s.c.KV() + pairs, _, err := kv.List(s.opts.Prefix, nil) + if err != nil { + return fmt.Errorf("failed to read prefix: %v", err) + } + + // Find the lock pair, bail if it doesn't exist + lockPair := s.findLock(pairs) + if lockPair.ModifyIndex == 0 { + return nil + } + + // Decode the lock + lock, err := s.decodeLock(lockPair) + if err != nil { + return err + } + + // Prune the dead holders + s.pruneDeadHolders(lock, pairs) + + // Check if there are any holders + if len(lock.Holders) > 0 { + return ErrSemaphoreInUse + } + + // Attempt the delete + didRemove, _, err := kv.DeleteCAS(lockPair, nil) + if err != nil { + return fmt.Errorf("failed to remove semaphore: %v", err) + } + if !didRemove { + return ErrSemaphoreInUse + } + return nil +} + // createSession is used to create a new managed session func (s *Semaphore) createSession() (string, error) { session := s.c.Session() diff --git a/api/semaphore_test.go b/api/semaphore_test.go index d2d1487ef6..e57f2b4573 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -212,3 +212,58 @@ func TestSemaphore_BadLimit(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestSemaphore_Destroy(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + sema2, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = sema2.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should fail, still held + if err := sema.Destroy(); err != ErrSemaphoreHeld { + t.Fatalf("err: %v", err) + } + + err = sema.Release() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should fail, still in use + if err := sema.Destroy(); err != ErrSemaphoreInUse { + t.Fatalf("err: %v", err) + } + + err = sema2.Release() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should work + if err := sema.Destroy(); err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should work + if err := sema2.Destroy(); err != nil { + t.Fatalf("err: %v", err) + } +}