From 4fef14163d4983027c8dd4d7484091e9432f4089 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 14:37:36 -1000 Subject: [PATCH 01/12] api: Make channels receive only --- api/lock.go | 2 +- api/semaphore.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/lock.go b/api/lock.go index 837304d5ff..83f7b2ca5c 100644 --- a/api/lock.go +++ b/api/lock.go @@ -103,7 +103,7 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) { // created without any associated health checks. By default Consul sessions // prefer liveness over safety and an application must be able to handle // the lock being lost. -func (l *Lock) Lock(stopCh chan struct{}) (chan struct{}, error) { +func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { // Hold the lock as we try to acquire l.l.Lock() defer l.l.Unlock() diff --git a/api/semaphore.go b/api/semaphore.go index 17b6396c34..f62af92f3c 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -128,7 +128,7 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) { // created without any associated health checks. By default Consul sessions // prefer liveness over safety and an application must be able to handle // the session being lost. -func (s *Semaphore) Acquire(stopCh chan struct{}) (chan struct{}, error) { +func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) { // Hold the lock as we try to acquire s.l.Lock() defer s.l.Unlock() From f2f980f5bc5baf7187a63edfb9f6248770a38c71 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 14:37:48 -1000 Subject: [PATCH 02/12] command/lock: First pass at lock --- command/lock.go | 335 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 command/lock.go diff --git a/command/lock.go b/command/lock.go new file mode 100644 index 0000000000..3a57d98a80 --- /dev/null +++ b/command/lock.go @@ -0,0 +1,335 @@ +package command + +import ( + "flag" + "fmt" + "os" + "path" + "strings" + "sync" + "syscall" + "time" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/agent" + "github.com/mitchellh/cli" +) + +const ( + // lockKillGracePeriod is how long we allow a child between + // a SIGTERM and a SIGKILL. This is to let the child cleanup + // any necessary state. We have to balance this with the risk + // of a split-brain where multiple children may be acting as if + // they hold a lock. This value is currently based on the default + // lock-delay value of 15 seconds. This only affects locks and not + // semaphores. + lockKillGracePeriod = 5 * time.Second +) + +// LockCommand is a Command implementation that is used to setup +// a "lock" which manages lock acquasition and invokes a sub-process +type LockCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui + + child *os.Process + childLock sync.Mutex + verbose bool +} + +func (c *LockCommand) Help() string { + helpText := ` +Usage: consul lock [options] prefix child... + + Acquires a lock or semaphore at a given path, and invokes a child + process when successful. The child process can assume the lock is + held while it executes. If the lock is lost or communication is disrupted + the child process will be sent a SIGTERM signal and given time to + gracefully exit. After the grace period expires the process will + be hard terminated. + + When -n=1, only a single lock holder or leader exists providing + mutual exclusion. Setting a higher value switches to a semaphore + allowing multiple holders to coordinate. + + The prefix provided must have write privileges. + +Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -n=1 Maximum number of allowed lock holders. If this + value is one, it operates as a lock, otherwise + a semaphore is used. + -name="" Optional name to associate with lock session. + -token="" ACL token to use. Defaults to that of agent. + -verbose Enables verbose output +` + return strings.TrimSpace(helpText) +} + +func (c *LockCommand) Run(args []string) int { + var name, token string + var limit int + cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.IntVar(&limit, "n", 1, "") + cmdFlags.StringVar(&name, "name", "", "") + cmdFlags.StringVar(&token, "token", "", "") + cmdFlags.BoolVar(&c.verbose, "verbose", false, "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + // Check the limit + if limit <= 0 { + c.Ui.Error(fmt.Sprintf("Lock holder limit must be positive")) + return 1 + } + + // Verify the prefix and child are provided + extra := cmdFlags.Args() + if len(extra) < 2 { + c.Ui.Error("Key prefix and child command must be specified") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + prefix := extra[0] + script := strings.Join(extra[1:], " ") + + // Create and test the HTTP client + client, err := HTTPClient(*httpAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + _, err = client.Agent().NodeName() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + + // Setup the lock or semaphore + var lu *LockUnlock + if limit == 1 { + lu, err = c.setupLock(client, prefix, name) + } else { + lu, err = c.setupSemaphore(client, limit, prefix, name) + } + if err != nil { + c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err)) + return 1 + } + + // Attempt the acquisition + if c.verbose { + c.Ui.Info("Attempting lock acquisition") + } + lockCh, err := lu.lockFn(c.ShutdownCh) + if err != nil || lockCh == nil { + c.Ui.Error(fmt.Sprintf("Lock acquisition failed: %s", err)) + return 1 + } + + // Start the child process + childDone := make(chan struct{}) + go func() { + if err := c.startChild(script, childDone); err != nil { + c.Ui.Error(fmt.Sprintf("%s", err)) + } + }() + + // Monitor for shutdown, child termination, or lock loss + select { + case <-c.ShutdownCh: + if c.verbose { + c.Ui.Info("Shutdown triggered, killing child") + } + case <-lockCh: + if c.verbose { + c.Ui.Info("Lock lost, killing child") + } + case <-childDone: + if c.verbose { + c.Ui.Info("Child terminated, releasing lock") + } + goto RELEASE + } + + // Kill the child + if err := c.killChild(childDone); err != nil { + c.Ui.Error(fmt.Sprintf("%s", err)) + } + +RELEASE: + // Release the lock before termination + if err := lu.unlockFn(); err != nil { + c.Ui.Error(fmt.Sprintf("Lock release failed: %s", err)) + return 1 + } + + // Cleanup the lock if no longer in use + if err := lu.cleanupFn(); err != nil { + if err != lu.inUseErr { + c.Ui.Error(fmt.Sprintf("Lock cleanup failed: %s", err)) + return 1 + } else if c.verbose { + c.Ui.Info("Cleanup aborted, lock in use") + } + } else if c.verbose { + c.Ui.Info("Cleanup succeeded") + } + return 0 +} + +// setupLock is used to setup a new Lock given the API client, +// the key prefix to operate on, and an optional session name. +func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockUnlock, error) { + key := path.Join(prefix, "lock") + if c.verbose { + c.Ui.Info(fmt.Sprintf("Setting up lock at path: %s", key)) + } + opts := api.LockOptions{ + Key: key, + SessionName: name, + } + l, err := client.LockOpts(&opts) + if err != nil { + return nil, err + } + lu := &LockUnlock{ + lockFn: l.Lock, + unlockFn: l.Unlock, + cleanupFn: l.Destroy, + inUseErr: api.ErrLockInUse, + } + return lu, nil +} + +// setupSemaphore is used to setup a new Semaphore given the +// API client, key prefix, session name, and slot holder limit. +func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string) (*LockUnlock, error) { + if c.verbose { + c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix)) + } + opts := api.SemaphoreOptions{ + Prefix: prefix, + Limit: limit, + SessionName: name, + } + s, err := client.SemaphoreOpts(&opts) + if err != nil { + return nil, err + } + lu := &LockUnlock{ + lockFn: s.Acquire, + unlockFn: s.Release, + cleanupFn: s.Destroy, + inUseErr: api.ErrSemaphoreInUse, + } + return lu, nil +} + +// startChild is a long running routine used to start and +// wait for the child process to exit. +func (c *LockCommand) startChild(script string, doneCh chan struct{}) error { + defer close(doneCh) + if c.verbose { + c.Ui.Info(fmt.Sprintf("Starting handler '%s'", script)) + } + // Create the command + cmd, err := agent.ExecScript(script) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err)) + return err + } + + // Setup the command streams + cmd.Env = append(os.Environ(), + "CONSUL_LOCK_HELD=true", + ) + cmd.Stdin = nil + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Start the child process + if err := cmd.Start(); err != nil { + c.Ui.Error(fmt.Sprintf("Error starting handler: %s", err)) + return err + } + + // Setup the child info + c.childLock.Lock() + c.child = cmd.Process + c.childLock.Unlock() + + // Wait for the child process + if err := cmd.Wait(); err != nil { + c.Ui.Error(fmt.Sprintf("Error running handler: %s", err)) + return err + } + return nil +} + +// killChild is used to forcefully kill the child, first using SIGTERM +// to allow for a graceful cleanup and then using SIGKILL for a hard +// termination. +func (c *LockCommand) killChild(childDone chan struct{}) error { + // Get the child process + c.childLock.Lock() + child := c.child + c.childLock.Unlock() + + // If there is no child process (failed to start), we can quit early + if child == nil { + if c.verbose { + c.Ui.Info("No child process to kill") + } + return nil + } + + // Attempt a SIGTERM first + if c.verbose { + c.Ui.Info(fmt.Sprintf("Sending SIGTERM to child pid %d", child.Pid)) + } + if err := syscall.Kill(child.Pid, syscall.SIGTERM); err != nil { + return fmt.Errorf("Failed to terminate %d: %v", child.Pid, err) + } + + // Wait for termination, or until a timeout + select { + case <-childDone: + if c.verbose { + c.Ui.Info("Child exited after SIGTERM") + } + return nil + case <-time.After(lockKillGracePeriod): + if c.verbose { + c.Ui.Info(fmt.Sprintf("Child did not exit after grace period of %v", + lockKillGracePeriod)) + } + } + + // Send a final SIGKILL first + if c.verbose { + c.Ui.Info(fmt.Sprintf("Sending SIGKILL to child pid %d", child.Pid)) + } + if err := syscall.Kill(child.Pid, syscall.SIGKILL); err != nil { + return fmt.Errorf("Failed to kill %d: %v", child.Pid, err) + } + return nil +} + +func (c *LockCommand) Synopsis() string { + return "Execute a command holding a lock" +} + +// LockUnlock is used to abstract over the differences between +// a lock and a semaphore. +type LockUnlock struct { + lockFn func(<-chan struct{}) (<-chan struct{}, error) + unlockFn func() error + cleanupFn func() error + inUseErr error +} From 06249f0ee654e67ba0913af770d2c934c0b259c1 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 14:37:58 -1000 Subject: [PATCH 03/12] Adding new command 'lock' --- commands.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/commands.go b/commands.go index 8f39afa107..13721975ea 100644 --- a/commands.go +++ b/commands.go @@ -69,6 +69,13 @@ func init() { }, nil }, + "lock": func() (cli.Command, error) { + return &command.LockCommand{ + ShutdownCh: makeShutdownCh(), + Ui: ui, + }, nil + }, + "members": func() (cli.Command, error) { return &command.MembersCommand{ Ui: ui, From 08c0a81e4f40c83e7a0f3cad95c34fa3d592f9ba Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 15:26:17 -1000 Subject: [PATCH 04/12] command/lock: Ensure a conflict between lock and semaphore with shared prefix --- command/lock.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/command/lock.go b/command/lock.go index 3a57d98a80..cbe5bcdb71 100644 --- a/command/lock.go +++ b/command/lock.go @@ -186,7 +186,10 @@ RELEASE: // setupLock is used to setup a new Lock given the API client, // the key prefix to operate on, and an optional session name. func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockUnlock, error) { - key := path.Join(prefix, "lock") + // Use the DefaultSemaphoreKey extention, this way if a lock and + // semaphore are both used at the same prefix, we will get a conflict + // which we can report to the user. + key := path.Join(prefix, api.DefaultSemaphoreKey) if c.verbose { c.Ui.Info(fmt.Sprintf("Setting up lock at path: %s", key)) } From 14691f7e2966f9c5626875b8bb5f47361f1f838e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 15:32:19 -1000 Subject: [PATCH 05/12] api: Detect conflicting existing values for lock/semaphore --- api/api_test.go | 1 + api/lock.go | 18 ++++++++++++++++++ api/lock_test.go | 37 +++++++++++++++++++++++++++++++++++++ api/semaphore.go | 19 ++++++++++++++++++- api/semaphore_test.go | 37 +++++++++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 1 deletion(-) diff --git a/api/api_test.go b/api/api_test.go index 4b697ba58f..1c2a67d231 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -22,6 +22,7 @@ var consulConfig = `{ "serf_wan": 18400, "server": 18000 }, + "bind_addr": "127.0.0.1", "data_dir": "%s", "bootstrap": true, "log_level": "debug", diff --git a/api/lock.go b/api/lock.go index 83f7b2ca5c..b8abbd476d 100644 --- a/api/lock.go +++ b/api/lock.go @@ -24,6 +24,11 @@ const ( // before attempting to do the lock again. This is so that once a lock-delay // is in affect, we do not hot loop retrying the acquisition. DefaultLockRetryTime = 5 * time.Second + + // LockFlagValue is a magic flag we set to indicate a key + // is being used for a semaphore. It is used to detect a potential + // conflict with a lock. + LockFlagValue = 0x2ddccbc058a50c18 ) var ( @@ -37,6 +42,10 @@ var ( // ErrLockInUse is returned if we attempt to destroy a lock // that is in use. ErrLockInUse = fmt.Errorf("Lock in use") + + // ErrLockConflict is returned if the flags on a key + // used for a lock do not match expectation + ErrLockConflict = fmt.Errorf("Existing key does not match lock use") ) // Lock is used to implement client-side leader election. It is follows the @@ -152,6 +161,9 @@ WAIT: if err != nil { return nil, fmt.Errorf("failed to read lock: %v", err) } + if pair != nil && pair.Flags != LockFlagValue { + return nil, ErrLockConflict + } if pair != nil && pair.Session != "" { qOpts.WaitIndex = meta.LastIndex goto WAIT @@ -245,6 +257,11 @@ func (l *Lock) Destroy() error { return nil } + // Check for possible flag conflict + if pair.Flags != LockFlagValue { + return ErrLockConflict + } + // Check if it is in use if pair.Session != "" { return ErrLockInUse @@ -281,6 +298,7 @@ func (l *Lock) lockEntry(session string) *KVPair { Key: l.opts.Key, Value: l.opts.Value, Session: session, + Flags: LockFlagValue, } } diff --git a/api/lock_test.go b/api/lock_test.go index 8b849a8b73..a4aea7349c 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -250,3 +250,40 @@ func TestLock_Destroy(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestLock_Conflict(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/lock/", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + lockCh, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if lockCh == nil { + t.Fatalf("not hold") + } + defer sema.Release() + + lock, err := c.LockKey("test/lock/.lock") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should conflict with semaphore + _, err = lock.Lock(nil) + if err != ErrLockConflict { + t.Fatalf("err: %v", err) + } + + // Should conflict with semaphore + err = lock.Destroy() + if err != ErrLockConflict { + t.Fatalf("err: %v", err) + } +} diff --git a/api/semaphore.go b/api/semaphore.go index f62af92f3c..7139c40dba 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -30,6 +30,11 @@ const ( // DefaultSemaphoreKey is the key used within the prefix to // use for coordination between all the contenders. DefaultSemaphoreKey = ".lock" + + // SemaphoreFlagValue is a magic flag we set to indicate a key + // is being used for a semaphore. It is used to detect a potential + // conflict with a lock. + SemaphoreFlagValue = 0xe0f69a2baa414de0 ) var ( @@ -43,6 +48,10 @@ var ( // ErrSemaphoreInUse is returned if we attempt to destroy a semaphore // that is in use. ErrSemaphoreInUse = fmt.Errorf("Semaphore in use") + + // ErrSemaphoreConflict is returned if the flags on a key + // used for a semaphore do not match expectation + ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use") ) // Semaphore is used to implement a distributed semaphore @@ -186,6 +195,9 @@ WAIT: // Decode the lock lockPair := s.findLock(pairs) + if lockPair.Flags != SemaphoreFlagValue { + return nil, ErrSemaphoreConflict + } lock, err := s.decodeLock(lockPair) if err != nil { return nil, err @@ -328,6 +340,9 @@ func (s *Semaphore) Destroy() error { if lockPair.ModifyIndex == 0 { return nil } + if lockPair.Flags != SemaphoreFlagValue { + return ErrSemaphoreConflict + } // Decode the lock lock, err := s.decodeLock(lockPair) @@ -399,6 +414,7 @@ func (s *Semaphore) contenderEntry(session string) *KVPair { Key: path.Join(s.opts.Prefix, session), Value: s.opts.Value, Session: session, + Flags: SemaphoreFlagValue, } } @@ -410,7 +426,7 @@ func (s *Semaphore) findLock(pairs KVPairs) *KVPair { return pair } } - return &KVPair{} + return &KVPair{Flags: SemaphoreFlagValue} } // decodeLock is used to decode a semaphoreLock from an @@ -441,6 +457,7 @@ func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, erro pair := &KVPair{ Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey), Value: enc, + Flags: SemaphoreFlagValue, ModifyIndex: oldIndex, } return pair, nil diff --git a/api/semaphore_test.go b/api/semaphore_test.go index e57f2b4573..b931d25938 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -267,3 +267,40 @@ func TestSemaphore_Destroy(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestSemaphore_Conflict(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + lock, err := c.LockKey("test/sema/.lock") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should work + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("not leader") + } + defer lock.Unlock() + + sema, err := c.SemaphorePrefix("test/sema/", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should conflict with lock + _, err = sema.Acquire(nil) + if err != ErrSemaphoreConflict { + t.Fatalf("err: %v", err) + } + + // Should conflict with lock + err = sema.Destroy() + if err != ErrSemaphoreConflict { + t.Fatalf("err: %v", err) + } +} From 876de111bff950e73a8cd2d240a2d8efad3b166b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 15:38:00 -1000 Subject: [PATCH 06/12] command/lock: Calculate name, use provided token --- command/lock.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/command/lock.go b/command/lock.go index cbe5bcdb71..96b7dc1247 100644 --- a/command/lock.go +++ b/command/lock.go @@ -98,8 +98,16 @@ func (c *LockCommand) Run(args []string) int { prefix := extra[0] script := strings.Join(extra[1:], " ") + // Calculate a session name if none provided + if name == "" { + name = fmt.Sprintf("Consul lock for '%s' at '%s'", script, prefix) + } + // Create and test the HTTP client - client, err := HTTPClient(*httpAddr) + conf := api.DefaultConfig() + conf.Address = *httpAddr + conf.Token = token + client, err := api.NewClient(conf) if err != nil { c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) return 1 From d6837e2f31704dd615184934fefe8cf5643dc566 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 15:47:17 -1000 Subject: [PATCH 07/12] command/lock: Adding simple test --- command/lock_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 command/lock_test.go diff --git a/command/lock_test.go b/command/lock_test.go new file mode 100644 index 0000000000..df3f029867 --- /dev/null +++ b/command/lock_test.go @@ -0,0 +1,37 @@ +package command + +import ( + "fmt" + "io/ioutil" + "path/filepath" + "testing" + + "github.com/mitchellh/cli" +) + +func TestLockCommand_implements(t *testing.T) { + var _ cli.Command = &LockCommand{} +} + +func TestLockCommandRun(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := &LockCommand{Ui: ui} + filePath := filepath.Join(a1.dir, "test_touch") + touchCmd := fmt.Sprintf("touch '%s'", filePath) + args := []string{"-http-addr=" + a1.httpAddr, "test/prefix", touchCmd} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + // Check for the file + _, err := ioutil.ReadFile(filePath) + if err != nil { + t.Fatalf("err: %v", err) + } +} From 3af280213cb8b0eb9870a823ab5f7b9eb9d7a697 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 16:43:24 -1000 Subject: [PATCH 08/12] website: Document a distributed semaphore --- .../source/docs/guides/index.html.markdown | 2 + .../docs/guides/semaphore.html.markdown | 133 ++++++++++++++++++ website/source/layouts/docs.erb | 4 + 3 files changed, 139 insertions(+) create mode 100644 website/source/docs/guides/semaphore.html.markdown diff --git a/website/source/docs/guides/index.html.markdown b/website/source/docs/guides/index.html.markdown index af804320b0..6e618b0483 100644 --- a/website/source/docs/guides/index.html.markdown +++ b/website/source/docs/guides/index.html.markdown @@ -27,3 +27,5 @@ The following guides are available: * [Multiple Datacenters](/docs/guides/datacenters.html) - Configuring Consul to support multiple datacenters. * [Outage Recovery](/docs/guides/outage.html) - This guide covers recovering a cluster that has become unavailable due to server failures. + +* [Semaphore](/docs/guides/semaphore.html) - This guide covers using the Key/Value store to implement a semaphore. diff --git a/website/source/docs/guides/semaphore.html.markdown b/website/source/docs/guides/semaphore.html.markdown new file mode 100644 index 0000000000..bb56cffc6f --- /dev/null +++ b/website/source/docs/guides/semaphore.html.markdown @@ -0,0 +1,133 @@ +--- +layout: "docs" +page_title: "Semaphore" +sidebar_current: "docs-guides-semaphore" +description: |- + This guide demonstrates how to implement a distributed semaphore using the Consul Key/Value store. +--- + +# Semaphore + +The goal of this guide is to cover how to build a client-side semaphore using Consul. +This is useful when you want to coordinate many services while restricting access to +certain resources. + +If you only need mutual exclusion or leader election, [this guide](/docs/guides/leader-election.html) +provides a simpler algorithm that can be used instead. + +There are a number of ways that a semaphore can be built, so our goal is not to +cover all the possible methods. Instead, we will focus on using Consul's support for +[sessions](/docs/internals/sessions.html), which allow us to build a system that can +gracefully handle failures. + +Note that JSON output in this guide has been pretty-printed for easier +reading. Actual values returned from the API will not be formatted. + +## Contending Nodes + +The primary flow is for nodes who are attempting to acquire a slot in the semaphore. +All nodes that are participating should agree on a given prefix being used to coordinate, +a single lock key, and a limit of slot holders. A good choice is simply: + +```text +service//lock/ +``` + +We will refer to this as just `` for simplicity. + +The first step is to create a session. This is done using the [/v1/session/create endpoint][session-api]: + +[session-api]: http://www.consul.io/docs/agent/http.html#_v1_session_create + +```text +curl -X PUT -d '{"Name": "dbservice"}' \ + http://localhost:8500/v1/session/create + ``` + +This will return a JSON object contain the session ID: + +```text +{ + "ID": "4ca8e74b-6350-7587-addf-a18084928f3c" +} +``` + +The session by default makes use of only the gossip failure detector. Additional checks +can be specified if desired. + +Next, we create a contender entry. Each contender makes an entry that is tied +to a session. This is done so that if a contender is holding a slot and fails +it can be detected by the other contenders. Optionally, an opaque value +can be associated with the contender via a ``. + +Create the contender key by doing an `acquire` on `/` by doing a `PUT`. +This is something like: + +```text +curl -X PUT -d http://localhost:8500/v1/kv//?acquire= + ``` + +Where `` is the ID returned by the call to `/v1/session/create`. + +This will either return `true` or `false`. If `true` is returned, the contender +entry has been created. If `false` is returned, the contender node was not created and +likely this indicates a session invalidation. + +The next step is to use a single key to coordinate which holders are currently +reserving a slot. A good choice is simply `/.lock`. We will refer to this +special coordinating key as ``. The current state of the semaphore is read by +doing a `GET` on the entire ``: + +```text +curl http://localhost:8500/v1/kv/?recurse + ``` + +Within the list of the entries, we should find the ``. That entry should hold +both the slot limit and the current holders. A simple JSON body like the following works: + +```text +{ + "Limit": 3, + "Holders": { + "4ca8e74b-6350-7587-addf-a18084928f3c": true, + "adf4238a-882b-9ddc-4a9d-5b6758e4159e": true + } +} +``` + +When the `` is read, we can verify the remote `Limit` agrees with the local value. This +is used to detect a potential conflict. The next step is to determine which of the current +slot holders are still alive. As part of the results of the `GET`, we have all the contender +entries. By scanning those entries, we create a set of all the `Session` values. Any of the +`Holders` that are not in that set are pruned. In effect, we are creating a set of live contenders +based on the list results, and doing a set difference with the `Holders` to detect and prune +any potentially failed holders. + +If the number of holders (after pruning) is less than the limit, a contender attempts acquisition +by adding its own session to the `Holders` and doing a Check-And-Set update of the ``. This +performs an optimistic update. + +This is done by: + +```text +curl -X PUT -d http://localhost:8500/v1/kv/?cas= + ``` + +If this suceeds with `true` the condenter now holds a slot in the semaphore. If this fails +with `false`, then likely there was a race with another contender to acquire the slot. +Both code paths now go into an idle waiting state. In this state, we watch for changes +on ``. This is because a slot may be released, a node may fail, etc. +Slot holders must also watch for changes since the slot may be released by an operator, +or automatically released due to a false positive in the failure detector. + +Watching for changes is done by doing a blocking query against ``. If a contender +holds a slot, then on any change the `` should be re-checked to ensure the slot is +still held. If no slot is held, then the same acquisition logic is triggered to check +and potentially re-attempt acquisition. This allows a contender to steal the slot from +a failed contender or one that has voluntarily released its slot. + +If a slot holder ever wishes to release voluntarily, this should be done by doing a +Check-And-Set operation against `` to remove its session from the `Holders`. Once +that is done, the contender entry at `/` should be delete. Finally the +session should be destroyed. + diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index d6764e7929..d6c5e2aaa7 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -187,6 +187,10 @@ > Outage Recovery + + > + Semaphore + > From d6d1f19f1b908417f00a02d2608735da114b2093 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 16:43:38 -1000 Subject: [PATCH 09/12] website: Document the lock command --- .../source/docs/commands/index.html.markdown | 2 + .../source/docs/commands/lock.html.markdown | 61 +++++++++++++++++++ website/source/layouts/docs.erb | 4 ++ 3 files changed, 67 insertions(+) create mode 100644 website/source/docs/commands/lock.html.markdown diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index fafca6ed90..0b5c621369 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -33,7 +33,9 @@ Available commands are: info Provides debugging information for operators join Tell Consul agent to join cluster keygen Generates a new encryption key + keyring Manages gossip layer encryption keys leave Gracefully leaves the Consul cluster and shuts down + lock Execute a command holding a lock members Lists the members of a Consul cluster monitor Stream logs from a Consul agent reload Triggers the agent to reload configuration files diff --git a/website/source/docs/commands/lock.html.markdown b/website/source/docs/commands/lock.html.markdown new file mode 100644 index 0000000000..a2c3f77e98 --- /dev/null +++ b/website/source/docs/commands/lock.html.markdown @@ -0,0 +1,61 @@ +--- +layout: "docs" +page_title: "Commands: Lock" +sidebar_current: "docs-commands-lock" +description: |- + The lock command provides a mechanism for leader election, mutual exclusion, + or worker pools. For example, this can be used to ensure a maximum number of + services running at once across a cluster. +--- + +# Consul Lock + +Command: `consul lock` + +The `lock` command provides a mechanism for simple distributed locking. +A lock (or semaphore) is created at a given prefix in the Key/Value store, +and only when held, is a child process invoked. If the lock is lost or +communication disrupted, the child process is terminated.A + +The number of lock holder is configurable with the `-n` flag. By default, +a single holder is allowed, and a lock is used for mutual exclusion. This +uses the [leader election algorithm](/docs/guides/leader-election.html). + +If the lock holder count is more than one, then a semaphore is used instead. +A semaphore allows more than a single holder, but the is less efficient than +a simple lock. This follows the [semaphore algorithm](/docs/guides/semaphore.html). + +An example use case is for highly-available N+1 deployments. In these +cases, if N instances of a service are required, N+1 are deployed and use +consul lock with `-n=N` to ensure only N instances are running. For singleton +services, a hot standby waits until the current leader fails to take over. + +## Usage + +Usage: `consul lock [options] prefix child...` + +The only required options are the key prefix and the command to execute. +The prefix must be writable. The child is invoked only when the lock is held, +and the `CONSUL_LOCK_HELD` environment variable will be set to `true`. + +If the lock is lost, communication disrupted, or the parent process interrupted, +the child process will receive a `SIGTERM`. After a grace period, a `SIGKILL` +will be used to force termination. + +The list of available flags are: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-n` - Optional, limit of lock holders. Defaults to 1. The underlying + implementation switches from a lock to a semaphore when increased past + one. + +* `-name` - Optional name to associate with the underlying session. + If not provided, one is generated based on the child command. + +* `-token` - ACL token to use. Defaults to that of agent. + +* `-verbose` - Enables verbose output. + diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index d6c5e2aaa7..7b44f3e9b0 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -85,6 +85,10 @@ > leave + + + > + lock > From 7195de51a2940bc0c34abcae79ce36bcf1c36c53 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 19 Jan 2015 16:53:47 -1000 Subject: [PATCH 10/12] api: Refactor to share session renew code --- api/lock.go | 27 ++------------------------- api/semaphore.go | 27 ++------------------------- api/session.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/api/lock.go b/api/lock.go index b8abbd476d..2b5aa2a290 100644 --- a/api/lock.go +++ b/api/lock.go @@ -130,7 +130,8 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { } else { l.sessionRenew = make(chan struct{}) l.lockSession = s - go l.renewSession(s, l.sessionRenew) + session := l.c.Session() + go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew) // If we fail to acquire the lock, cleanup the session defer func() { @@ -302,30 +303,6 @@ func (l *Lock) lockEntry(session string) *KVPair { } } -// renewSession is a long running routine that maintians a session -// by doing a periodic Session renewal. -func (l *Lock) renewSession(id string, doneCh chan struct{}) { - session := l.c.Session() - ttl, _ := time.ParseDuration(l.opts.SessionTTL) - for { - select { - case <-time.After(ttl / 2): - entry, _, err := session.Renew(id, nil) - if err != nil || entry == nil { - return - } - - // Handle the server updating the TTL - ttl, _ = time.ParseDuration(entry.TTL) - - case <-doneCh: - // Attempt a session destroy - session.Destroy(id, nil) - return - } - } -} - // monitorLock is a long running routine to monitor a lock ownership // It closes the stopCh if we lose our leadership. func (l *Lock) monitorLock(session string, stopCh chan struct{}) { diff --git a/api/semaphore.go b/api/semaphore.go index 7139c40dba..957f884a4d 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -155,7 +155,8 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) { } else { s.sessionRenew = make(chan struct{}) s.lockSession = sess - go s.renewSession(sess, s.sessionRenew) + session := s.c.Session() + go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew) // If we fail to acquire the lock, cleanup the session defer func() { @@ -384,30 +385,6 @@ func (s *Semaphore) createSession() (string, error) { return id, nil } -// renewSession is a long running routine that maintians a session -// by doing a periodic Session renewal. -func (s *Semaphore) renewSession(id string, doneCh chan struct{}) { - session := s.c.Session() - ttl, _ := time.ParseDuration(s.opts.SessionTTL) - for { - select { - case <-time.After(ttl / 2): - entry, _, err := session.Renew(id, nil) - if err != nil || entry == nil { - return - } - - // Handle the server updating the TTL - ttl, _ = time.ParseDuration(entry.TTL) - - case <-doneCh: - // Attempt a session destroy - session.Destroy(id, nil) - return - } - } -} - // contenderEntry returns a formatted KVPair for the contender func (s *Semaphore) contenderEntry(session string) *KVPair { return &KVPair{ diff --git a/api/session.go b/api/session.go index e889bbe0de..bb84644fd9 100644 --- a/api/session.go +++ b/api/session.go @@ -147,6 +147,36 @@ func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, return nil, wm, nil } +// RenewPeriodic is used to periodically invoke Session.Renew on a +// session until a doneCh is closed. This is meant to be used in a long running +// goroutine to ensure a session stays valid. +func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error { + ttl, err := time.ParseDuration(initialTTL) + if err != nil { + return err + } + for { + select { + case <-time.After(ttl / 2): + entry, _, err := s.Renew(id, q) + if err != nil { + return err + } + if entry == nil { + return nil + } + + // Handle the server updating the TTL + ttl, _ = time.ParseDuration(entry.TTL) + + case <-doneCh: + // Attempt a session destroy + s.Destroy(id, q) + return nil + } + } +} + // Info looks up a single session func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) { r := s.c.newRequest("GET", "/v1/session/info/"+id) From 2e281dc51fa34e0c320f7114fbb67e6949ce73ee Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 20 Jan 2015 12:58:09 -0800 Subject: [PATCH 11/12] api: Fixing some comments --- api/lock.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/lock.go b/api/lock.go index 2b5aa2a290..f6fdbbb166 100644 --- a/api/lock.go +++ b/api/lock.go @@ -26,8 +26,8 @@ const ( DefaultLockRetryTime = 5 * time.Second // LockFlagValue is a magic flag we set to indicate a key - // is being used for a semaphore. It is used to detect a potential - // conflict with a lock. + // is being used for a lock. It is used to detect a potential + // conflict with a semaphore. LockFlagValue = 0x2ddccbc058a50c18 ) From ce4fa17d225f9cc5d331b1c5bebd68e0fbfd0c1e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 20 Jan 2015 12:58:29 -0800 Subject: [PATCH 12/12] command/lock: Fixing mixed spaces and tabs --- command/lock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/lock.go b/command/lock.go index 96b7dc1247..7deb54b46b 100644 --- a/command/lock.go +++ b/command/lock.go @@ -59,7 +59,7 @@ Options: -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. -n=1 Maximum number of allowed lock holders. If this value is one, it operates as a lock, otherwise - a semaphore is used. + a semaphore is used. -name="" Optional name to associate with lock session. -token="" ACL token to use. Defaults to that of agent. -verbose Enables verbose output