From 14691f7e2966f9c5626875b8bb5f47361f1f838e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 15:32:19 -1000 Subject: [PATCH] api: Detect conflicting existing values for lock/semaphore --- api/api_test.go | 1 + api/lock.go | 18 ++++++++++++++++++ api/lock_test.go | 37 +++++++++++++++++++++++++++++++++++++ api/semaphore.go | 19 ++++++++++++++++++- api/semaphore_test.go | 37 +++++++++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 1 deletion(-) diff --git a/api/api_test.go b/api/api_test.go index 4b697ba58f..1c2a67d231 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -22,6 +22,7 @@ var consulConfig = `{ "serf_wan": 18400, "server": 18000 }, + "bind_addr": "127.0.0.1", "data_dir": "%s", "bootstrap": true, "log_level": "debug", diff --git a/api/lock.go b/api/lock.go index 83f7b2ca5c..b8abbd476d 100644 --- a/api/lock.go +++ b/api/lock.go @@ -24,6 +24,11 @@ const ( // before attempting to do the lock again. This is so that once a lock-delay // is in affect, we do not hot loop retrying the acquisition. DefaultLockRetryTime = 5 * time.Second + + // LockFlagValue is a magic flag we set to indicate a key + // is being used for a semaphore. It is used to detect a potential + // conflict with a lock. + LockFlagValue = 0x2ddccbc058a50c18 ) var ( @@ -37,6 +42,10 @@ var ( // ErrLockInUse is returned if we attempt to destroy a lock // that is in use. ErrLockInUse = fmt.Errorf("Lock in use") + + // ErrLockConflict is returned if the flags on a key + // used for a lock do not match expectation + ErrLockConflict = fmt.Errorf("Existing key does not match lock use") ) // Lock is used to implement client-side leader election. It is follows the @@ -152,6 +161,9 @@ WAIT: if err != nil { return nil, fmt.Errorf("failed to read lock: %v", err) } + if pair != nil && pair.Flags != LockFlagValue { + return nil, ErrLockConflict + } if pair != nil && pair.Session != "" { qOpts.WaitIndex = meta.LastIndex goto WAIT @@ -245,6 +257,11 @@ func (l *Lock) Destroy() error { return nil } + // Check for possible flag conflict + if pair.Flags != LockFlagValue { + return ErrLockConflict + } + // Check if it is in use if pair.Session != "" { return ErrLockInUse @@ -281,6 +298,7 @@ func (l *Lock) lockEntry(session string) *KVPair { Key: l.opts.Key, Value: l.opts.Value, Session: session, + Flags: LockFlagValue, } } diff --git a/api/lock_test.go b/api/lock_test.go index 8b849a8b73..a4aea7349c 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -250,3 +250,40 @@ func TestLock_Destroy(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestLock_Conflict(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/lock/", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + lockCh, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if lockCh == nil { + t.Fatalf("not hold") + } + defer sema.Release() + + lock, err := c.LockKey("test/lock/.lock") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should conflict with semaphore + _, err = lock.Lock(nil) + if err != ErrLockConflict { + t.Fatalf("err: %v", err) + } + + // Should conflict with semaphore + err = lock.Destroy() + if err != ErrLockConflict { + t.Fatalf("err: %v", err) + } +} diff --git a/api/semaphore.go b/api/semaphore.go index f62af92f3c..7139c40dba 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -30,6 +30,11 @@ const ( // DefaultSemaphoreKey is the key used within the prefix to // use for coordination between all the contenders. DefaultSemaphoreKey = ".lock" + + // SemaphoreFlagValue is a magic flag we set to indicate a key + // is being used for a semaphore. It is used to detect a potential + // conflict with a lock. + SemaphoreFlagValue = 0xe0f69a2baa414de0 ) var ( @@ -43,6 +48,10 @@ var ( // ErrSemaphoreInUse is returned if we attempt to destroy a semaphore // that is in use. ErrSemaphoreInUse = fmt.Errorf("Semaphore in use") + + // ErrSemaphoreConflict is returned if the flags on a key + // used for a semaphore do not match expectation + ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use") ) // Semaphore is used to implement a distributed semaphore @@ -186,6 +195,9 @@ WAIT: // Decode the lock lockPair := s.findLock(pairs) + if lockPair.Flags != SemaphoreFlagValue { + return nil, ErrSemaphoreConflict + } lock, err := s.decodeLock(lockPair) if err != nil { return nil, err @@ -328,6 +340,9 @@ func (s *Semaphore) Destroy() error { if lockPair.ModifyIndex == 0 { return nil } + if lockPair.Flags != SemaphoreFlagValue { + return ErrSemaphoreConflict + } // Decode the lock lock, err := s.decodeLock(lockPair) @@ -399,6 +414,7 @@ func (s *Semaphore) contenderEntry(session string) *KVPair { Key: path.Join(s.opts.Prefix, session), Value: s.opts.Value, Session: session, + Flags: SemaphoreFlagValue, } } @@ -410,7 +426,7 @@ func (s *Semaphore) findLock(pairs KVPairs) *KVPair { return pair } } - return &KVPair{} + return &KVPair{Flags: SemaphoreFlagValue} } // decodeLock is used to decode a semaphoreLock from an @@ -441,6 +457,7 @@ func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, erro pair := &KVPair{ Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey), Value: enc, + Flags: SemaphoreFlagValue, ModifyIndex: oldIndex, } return pair, nil diff --git a/api/semaphore_test.go b/api/semaphore_test.go index e57f2b4573..b931d25938 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -267,3 +267,40 @@ func TestSemaphore_Destroy(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestSemaphore_Conflict(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + lock, err := c.LockKey("test/sema/.lock") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("not leader") + } + defer lock.Unlock() + + sema, err := c.SemaphorePrefix("test/sema/", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should conflict with lock + _, err = sema.Acquire(nil) + if err != ErrSemaphoreConflict { + t.Fatalf("err: %v", err) + } + + // Should conflict with lock + err = sema.Destroy() + if err != ErrSemaphoreConflict { + t.Fatalf("err: %v", err) + } +}