From b39374acaed462e5a4b3dc80b4a5be3d6256c5d0 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 12 Jan 2015 17:43:54 -0800 Subject: [PATCH 1/7] api: First pass at semaphore --- api/semaphore.go | 430 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 430 insertions(+) create mode 100644 api/semaphore.go diff --git a/api/semaphore.go b/api/semaphore.go new file mode 100644 index 0000000000..01c29f2b87 --- /dev/null +++ b/api/semaphore.go @@ -0,0 +1,430 @@ +package api + +import ( + "encoding/json" + "fmt" + "path" + "strings" + "sync" + "time" +) + +const ( + // DefaultSemaphoreSessionName is the Session Name we assign if none is provided + DefaultSemaphoreSessionName = "Consul API Semaphore" + + // DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided + // when creating a new Semaphore. This is used because we do not have another + // other check to depend upon. + DefaultSemaphoreSessionTTL = "15s" + + // DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore + // acquisition is possible. This affects the minimum time it takes to cancel + // a Semaphore acquisition. + DefaultSemaphoreWaitTime = 15 * time.Second + + // DefaultSemaphoreRetryTime is how long we wait after a failed lock acquisition + // before attempting to do the lock again. This is so that once a lock-delay + // is in affect, we do not hot loop retrying the acquisition. + DefaultSemaphoreRetryTime = 5 * time.Second + + // DefaultSemaphoreKey is the key used within the prefix to + // use for coordination between all the contenders. + DefaultSemaphoreKey = "_lock" +) + +var ( + // ErrSemaphoreHeld is returned if we attempt to double lock + ErrSemaphoreHeld = fmt.Errorf("Semaphore already held") + + // ErrSemaphoreNotHeld is returned if we attempt to unlock a lock + // that we do not hold. + ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held") +) + +// Semaphore is used to implement a distributed semaphore +// using the Consul KV primitives. +type Semaphore struct { + c *Client + opts *SemaphoreOptions + + isHeld bool + sessionRenew chan struct{} + lockSession string + l sync.Mutex +} + +// SemaphoreOptions is used to parameterize the Semaphore +type SemaphoreOptions struct { + Prefix string // Must be set and have write permissions + Limit int // Must be set, and be positive + Value []byte // Optional, value to associate with the contender entry + Session string // OPtional, created if not specified + SessionName string // Optional, defaults to DefaultLockSessionName + SessionTTL string // Optional, defaults to DefaultLockSessionTTL +} + +// semaphoreLock is written under the DefaultSemaphoreKey and +// is used to coordinate between all the contenders. +type semaphoreLock struct { + // Limit is the integer limit of holders. This is used to + // verify that all the holders agree on the value. + Limit int + + // Holders is a list of all the semaphore holders. + // It maps the session ID to true. It is used as a set effectively. + Holders map[string]bool +} + +// SemaphorePrefix is used to created a Semaphore which will operate +// at the given KV prefix and uses the given limit for the semaphore. +// The prefix must have write privileges, and the limit must be agreed +// upon by all contenders. +func (c *Client) SemaphorePrefix(prefix string, limit int) (*Semaphore, error) { + opts := &SemaphoreOptions{ + Prefix: prefix, + Limit: limit, + } + return c.SemaphoreOpts(opts) +} + +// SemaphoreOpts is used to create a Semaphore with the given options. +// The prefix must have write privileges, and the limit must be agreed +// upon by all contenders. If a Session is not provided, one will be created. +func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) { + if opts.Prefix == "" { + return nil, fmt.Errorf("missing prefix") + } + if opts.Limit <= 0 { + return nil, fmt.Errorf("semaphore limit must be positive") + } + if opts.SessionName == "" { + opts.SessionName = DefaultSemaphoreSessionName + } + if opts.SessionTTL == "" { + opts.SessionTTL = DefaultSemaphoreSessionTTL + } else { + if _, err := time.ParseDuration(opts.SessionTTL); err != nil { + return nil, fmt.Errorf("invalid SessionTTL: %v", err) + } + } + s := &Semaphore{ + c: c, + opts: opts, + } + return s, nil +} + +// Acquire attempts to reserve a slot in the semaphore, blocking until +// success, interrupted via the stopCh or an error is encounted. +// Providing a non-nil stopCh can be used to abort the attempt. +// On success, a channel is returned that represents our slot. +// This channel could be closed at any time due to session invalidation, +// communication errors, operator intervention, etc. It is NOT safe to +// assume that the slot is held until Release() unless the Session is specifically +// created without any associated health checks. By default Consul sessions +// prefer liveness over safety and an application must be able to handle +// the session being lost. +func (s *Semaphore) Acquire(stopCh chan struct{}) (chan struct{}, error) { + // Hold the lock as we try to acquire + s.l.Lock() + defer s.l.Unlock() + + // Check if we already hold the semaphore + if s.isHeld { + return nil, ErrSemaphoreHeld + } + + // Check if we need to create a session first + s.lockSession = s.opts.Session + if s.lockSession == "" { + if sess, err := s.createSession(); err != nil { + return nil, fmt.Errorf("failed to create session: %v", err) + } else { + s.sessionRenew = make(chan struct{}) + s.lockSession = sess + go s.renewSession(sess, s.sessionRenew) + + // If we fail to acquire the lock, cleanup the session + defer func() { + if !s.isHeld { + close(s.sessionRenew) + s.sessionRenew = nil + } + }() + } + } + + // Create the contender entry + kv := s.c.KV() + made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), nil) + if err != nil || !made { + return nil, fmt.Errorf("failed to make contender entry: %v", err) + } + + // Setup the query options + qOpts := &QueryOptions{ + WaitTime: DefaultSemaphoreWaitTime, + } + +WAIT: + // Check if we should quit + select { + case <-stopCh: + return nil, nil + default: + } + + // Read the prefix + pairs, meta, err := kv.List(s.opts.Prefix, qOpts) + if err != nil { + return nil, fmt.Errorf("failed to read prefix: %v", err) + } + + // Decode the lock + lockPair := s.findLock(pairs) + lock, err := s.decodeLock(lockPair) + if err != nil { + return nil, err + } + + // Verify we agree with the limit + if lock.Limit != s.opts.Limit { + return nil, fmt.Errorf("semaphore limit conflict (lock: %d, local: %d)", + lock.Limit, s.opts.Limit) + } + + // Prune the dead holders + s.pruneDeadHolders(lock, pairs) + + // Check if the lock is held + if len(lock.Holders) >= lock.Limit { + qOpts.WaitIndex = meta.LastIndex + goto WAIT + } + + // Create a new lock with us as a holder + lock.Holders[s.lockSession] = true + newLock, err := s.encodeLock(lock, lockPair.ModifyIndex) + if err != nil { + return nil, err + } + + // Attempt the acquisition + didSet, _, err := kv.CAS(newLock, nil) + if err != nil { + return nil, fmt.Errorf("failed to update lock: %v", err) + } + if !didSet { + // Update failed, could have been a race with another contender, + // retry the operation + goto WAIT + } + + // Watch to ensure we maintain ownership of the slot + lockCh := make(chan struct{}) + go s.monitorLock(s.lockSession, lockCh) + + // Set that we own the lock + s.isHeld = true + + // Acquired! All done + return lockCh, nil +} + +// Release is used to voluntarily give up our semaphore slot. It is +// an error to call this if the semaphore has not been acquired. +func (s *Semaphore) Release() error { + // Hold the lock as we try to release + s.l.Lock() + defer s.l.Unlock() + + // Ensure the lock is actually held + if !s.isHeld { + return ErrSemaphoreNotHeld + } + + // Set that we no longer own the lock + s.isHeld = false + + // Stop the session renew + if s.sessionRenew != nil { + defer func() { + close(s.sessionRenew) + s.sessionRenew = nil + }() + } + + // Get and clear the lock session + lockSession := s.lockSession + s.lockSession = "" + + // Remove ourselves as a lock holder + kv := s.c.KV() + key := path.Join(s.opts.Prefix, DefaultSemaphoreKey) +READ: + pair, _, err := kv.Get(key, nil) + if err != nil { + return err + } + if pair == nil { + pair = &KVPair{} + } + lock, err := s.decodeLock(pair) + if err != nil { + return err + } + + // Create a new lock without us as a holder + if _, ok := lock.Holders[lockSession]; ok { + delete(lock.Holders, lockSession) + newLock, err := s.encodeLock(lock, pair.ModifyIndex) + if err != nil { + return err + } + + // Swap the locks + didSet, _, err := kv.CAS(newLock, nil) + if err != nil { + return fmt.Errorf("failed to update lock: %v", err) + } + if !didSet { + goto READ + } + } + + // Destroy the contender entry + contenderKey := path.Join(s.opts.Prefix, lockSession) + if _, err := kv.Delete(contenderKey, nil); err != nil { + return err + } + return nil +} + +// createSession is used to create a new managed session +func (s *Semaphore) createSession() (string, error) { + session := s.c.Session() + se := &SessionEntry{ + Name: s.opts.SessionName, + TTL: s.opts.SessionTTL, + Behavior: SessionBehaviorDelete, + } + id, _, err := session.Create(se, nil) + if err != nil { + return "", err + } + return id, nil +} + +// renewSession is a long running routine that maintians a session +// by doing a periodic Session renewal. +func (s *Semaphore) renewSession(id string, doneCh chan struct{}) { + session := s.c.Session() + ttl, _ := time.ParseDuration(s.opts.SessionTTL) + for { + select { + case <-time.After(ttl / 2): + entry, _, err := session.Renew(id, nil) + if err != nil || entry == nil { + return + } + + // Handle the server updating the TTL + ttl, _ = time.ParseDuration(entry.TTL) + + case <-doneCh: + // Attempt a session destroy + session.Destroy(id, nil) + return + } + } +} + +// contenderEntry returns a formatted KVPair for the contender +func (s *Semaphore) contenderEntry(session string) *KVPair { + return &KVPair{ + Key: path.Join(s.opts.Prefix, session), + Value: s.opts.Value, + Session: session, + } +} + +// findLock is used to find the KV Pair which is used for coordination +func (s *Semaphore) findLock(pairs KVPairs) *KVPair { + key := path.Join(s.opts.Prefix, DefaultSemaphoreKey) + for _, pair := range pairs { + if pair.Key == key { + return pair + } + } + return &KVPair{} +} + +// decodeLock is used to decode a semaphoreLock from an +// entry in Consul +func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) { + // Handle if there is no lock + if pair == nil || pair.Value == nil { + return &semaphoreLock{Limit: s.opts.Limit}, nil + } + + l := &semaphoreLock{} + if err := json.Unmarshal(pair.Value, l); err != nil { + return nil, fmt.Errorf("lock decoding failed: %v", err) + } + return l, nil +} + +// encodeLock is used to encode a semaphoreLock into a KVPair +// that can be PUT +func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, error) { + enc, err := json.Marshal(l) + if err != nil { + return nil, fmt.Errorf("lock encoding failed: %v", err) + } + pair := &KVPair{ + Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey), + Value: enc, + ModifyIndex: oldIndex, + } + return pair, nil +} + +// pruneDeadHolders is used to remove all the dead lock holders +func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) { + // Gather all the live holders + alive := make(map[string]struct{}, len(pairs)) + for _, pair := range pairs { + session := strings.TrimPrefix(pair.Key, s.opts.Prefix) + alive[session] = struct{}{} + } + + // Remove any holders that are dead + for holder := range lock.Holders { + if _, ok := alive[holder]; !ok { + delete(lock.Holders, holder) + } + } +} + +// monitorLock is a long running routine to monitor a semaphore ownership +// It closes the stopCh if we lose our slot. +func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) { + defer close(stopCh) + kv := s.c.KV() + opts := &QueryOptions{RequireConsistent: true} + key := path.Join(s.opts.Prefix, DefaultSemaphoreKey) +WAIT: + pair, meta, err := kv.Get(key, opts) + if err != nil { + return + } + lock, err := s.decodeLock(pair) + if err != nil { + return + } + if _, ok := lock.Holders[session]; ok { + opts.WaitIndex = meta.LastIndex + goto WAIT + } +} From 8ad16ca390c4b9229248670d740524423ec3ac91 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 12 Jan 2015 18:13:52 -0800 Subject: [PATCH 2/7] api: Adding semaphore tests and fixes --- api/semaphore.go | 12 ++- api/semaphore_test.go | 184 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 api/semaphore_test.go diff --git a/api/semaphore.go b/api/semaphore.go index 01c29f2b87..be939c82dd 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -365,7 +365,10 @@ func (s *Semaphore) findLock(pairs KVPairs) *KVPair { func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) { // Handle if there is no lock if pair == nil || pair.Value == nil { - return &semaphoreLock{Limit: s.opts.Limit}, nil + return &semaphoreLock{ + Limit: s.opts.Limit, + Holders: make(map[string]bool), + }, nil } l := &semaphoreLock{} @@ -413,16 +416,17 @@ func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) { defer close(stopCh) kv := s.c.KV() opts := &QueryOptions{RequireConsistent: true} - key := path.Join(s.opts.Prefix, DefaultSemaphoreKey) WAIT: - pair, meta, err := kv.Get(key, opts) + pairs, meta, err := kv.List(s.opts.Prefix, opts) if err != nil { return } - lock, err := s.decodeLock(pair) + lockPair := s.findLock(pairs) + lock, err := s.decodeLock(lockPair) if err != nil { return } + s.pruneDeadHolders(lock, pairs) if _, ok := lock.Holders[session]; ok { opts.WaitIndex = meta.LastIndex goto WAIT diff --git a/api/semaphore_test.go b/api/semaphore_test.go new file mode 100644 index 0000000000..e19029af43 --- /dev/null +++ b/api/semaphore_test.go @@ -0,0 +1,184 @@ +package api + +import ( + "log" + "sync" + "testing" + "time" +) + +func TestSemaphore_AcquireRelease(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Initial release should fail + err = sema.Release() + if err != ErrSemaphoreNotHeld { + t.Fatalf("err: %v", err) + } + + // Should work + lockCh, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if lockCh == nil { + t.Fatalf("not hold") + } + + // Double lock should fail + _, err = sema.Acquire(nil) + if err != ErrSemaphoreHeld { + t.Fatalf("err: %v", err) + } + + // Should be held + select { + case <-lockCh: + t.Fatalf("should be held") + default: + } + + // Initial release should work + err = sema.Release() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Double unlock should fail + err = sema.Release() + if err != ErrSemaphoreNotHeld { + t.Fatalf("err: %v", err) + } + + // Should lose resource + select { + case <-lockCh: + case <-time.After(time.Second): + t.Fatalf("should not be held") + } +} + +func TestSemaphore_ForceInvalidate(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + lockCh, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if lockCh == nil { + t.Fatalf("not acquired") + } + defer sema.Release() + + go func() { + // Nuke the session, simulator an operator invalidation + // or a health check failure + session := c.Session() + session.Destroy(sema.lockSession, nil) + }() + + // Should loose slot + select { + case <-lockCh: + case <-time.After(time.Second): + t.Fatalf("should not be locked") + } +} + +func TestSemaphore_DeleteKey(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + lockCh, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if lockCh == nil { + t.Fatalf("not locked") + } + defer sema.Release() + + go func() { + // Nuke the key, simulate an operator intervention + kv := c.KV() + kv.DeleteTree("test/semaphore", nil) + }() + + // Should loose leadership + select { + case <-lockCh: + case <-time.After(time.Second): + t.Fatalf("should not be locked") + } +} + +func TestSemaphore_Contend(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + wg := &sync.WaitGroup{} + acquired := make([]bool, 4) + for idx := range acquired { + wg.Add(1) + go func(idx int) { + defer wg.Done() + sema, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work eventually, will contend + lockCh, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if lockCh == nil { + t.Fatalf("not locked") + } + defer sema.Release() + log.Printf("Contender %d acquired", idx) + + // Set acquired and then leave + acquired[idx] = true + }(idx) + } + + // Wait for termination + doneCh := make(chan struct{}) + go func() { + wg.Wait() + close(doneCh) + }() + + // Wait for everybody to get a turn + select { + case <-doneCh: + case <-time.After(3 * DefaultLockRetryTime): + t.Fatalf("timeout") + } + + for idx, did := range acquired { + if !did { + t.Fatalf("contender %d never acquired", idx) + } + } +} From 5107f5d1f9a8ca52eaaa698df09d68c175c35a9c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 12 Jan 2015 18:18:32 -0800 Subject: [PATCH 3/7] api: More semaphore tests --- api/semaphore_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/api/semaphore_test.go b/api/semaphore_test.go index e19029af43..d2d1487ef6 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -182,3 +182,33 @@ func TestSemaphore_Contend(t *testing.T) { } } } + +func TestSemaphore_BadLimit(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/semaphore", 0) + if err == nil { + t.Fatalf("should error") + } + + sema, err = c.SemaphorePrefix("test/semaphore", 1) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + sema2, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = sema2.Acquire(nil) + if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" { + t.Fatalf("err: %v", err) + } +} From c19b0e2ab20d0fbda93f34e08603044a700fd6f9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 11:50:09 -0800 Subject: [PATCH 4/7] api: Enable debug output from Consul for tests --- api/api_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/api_test.go b/api/api_test.go index ddfe51a0d6..4b697ba58f 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -24,6 +24,7 @@ var consulConfig = `{ }, "data_dir": "%s", "bootstrap": true, + "log_level": "debug", "server": true }` @@ -73,6 +74,8 @@ func newTestServer(t *testing.T) *testServer { // Start the server cmd := exec.Command("consul", "agent", "-config-file", configFile.Name()) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { t.Fatalf("err: %s", err) } From 4a038927ee136e409f87aed4f81f2e0dd56b0cc3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 11:50:57 -0800 Subject: [PATCH 5/7] api: More reliable session check --- api/semaphore.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/semaphore.go b/api/semaphore.go index be939c82dd..88f4b59cce 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "path" - "strings" "sync" "time" ) @@ -398,8 +397,9 @@ func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) { // Gather all the live holders alive := make(map[string]struct{}, len(pairs)) for _, pair := range pairs { - session := strings.TrimPrefix(pair.Key, s.opts.Prefix) - alive[session] = struct{}{} + if pair.Session != "" { + alive[pair.Session] = struct{}{} + } } // Remove any holders that are dead From ed6abe05fbd9b31ebcc0402efbc14a7cb24390c0 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 11:51:12 -0800 Subject: [PATCH 6/7] api: Changing default semaphore key --- api/semaphore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/semaphore.go b/api/semaphore.go index 88f4b59cce..c1c467a957 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -29,7 +29,7 @@ const ( // DefaultSemaphoreKey is the key used within the prefix to // use for coordination between all the contenders. - DefaultSemaphoreKey = "_lock" + DefaultSemaphoreKey = ".lock" ) var ( From 8068c73b29478f7781f1ca7463e37fb681f09847 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 11:51:24 -0800 Subject: [PATCH 7/7] consul: Fixing blocking query returning old result --- consul/kvs_endpoint.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 0851c9e4f5..16140fa09b 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -157,6 +157,8 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e } else { reply.Index = index } + reply.Entries = nil + } else { // Determine the maximum affected index var maxIndex uint64