Fixing merge conflict

This commit is contained in:
Armon Dadgar 2015-01-20 13:01:13 -08:00
commit d998bb3d67
14 changed files with 758 additions and 53 deletions

View File

@ -16,6 +16,22 @@ import (
"github.com/hashicorp/consul/testutil"
)
var consulConfig = `{
"ports": {
"dns": 19000,
"http": 18800,
"rpc": 18600,
"serf_lan": 18200,
"serf_wan": 18400,
"server": 18000
},
"bind_addr": "127.0.0.1",
"data_dir": "%s",
"bootstrap": true,
"log_level": "debug",
"server": true
}`
type testServer struct {
pid int
dataDir string

View File

@ -24,6 +24,11 @@ const (
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time.Second
// LockFlagValue is a magic flag we set to indicate a key
// is being used for a lock. It is used to detect a potential
// conflict with a semaphore.
LockFlagValue = 0x2ddccbc058a50c18
)
var (
@ -37,6 +42,10 @@ var (
// ErrLockInUse is returned if we attempt to destroy a lock
// that is in use.
ErrLockInUse = fmt.Errorf("Lock in use")
// ErrLockConflict is returned if the flags on a key
// used for a lock do not match expectation
ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
)
// Lock is used to implement client-side leader election. It is follows the
@ -103,7 +112,7 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the lock being lost.
func (l *Lock) Lock(stopCh chan struct{}) (chan struct{}, error) {
func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Hold the lock as we try to acquire
l.l.Lock()
defer l.l.Unlock()
@ -121,7 +130,8 @@ func (l *Lock) Lock(stopCh chan struct{}) (chan struct{}, error) {
} else {
l.sessionRenew = make(chan struct{})
l.lockSession = s
go l.renewSession(s, l.sessionRenew)
session := l.c.Session()
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
@ -152,6 +162,9 @@ WAIT:
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
if pair != nil && pair.Flags != LockFlagValue {
return nil, ErrLockConflict
}
if pair != nil && pair.Session != "" {
qOpts.WaitIndex = meta.LastIndex
goto WAIT
@ -245,6 +258,11 @@ func (l *Lock) Destroy() error {
return nil
}
// Check for possible flag conflict
if pair.Flags != LockFlagValue {
return ErrLockConflict
}
// Check if it is in use
if pair.Session != "" {
return ErrLockInUse
@ -281,30 +299,7 @@ func (l *Lock) lockEntry(session string) *KVPair {
Key: l.opts.Key,
Value: l.opts.Value,
Session: session,
}
}
// renewSession is a long running routine that maintians a session
// by doing a periodic Session renewal.
func (l *Lock) renewSession(id string, doneCh chan struct{}) {
session := l.c.Session()
ttl, _ := time.ParseDuration(l.opts.SessionTTL)
for {
select {
case <-time.After(ttl / 2):
entry, _, err := session.Renew(id, nil)
if err != nil || entry == nil {
return
}
// Handle the server updating the TTL
ttl, _ = time.ParseDuration(entry.TTL)
case <-doneCh:
// Attempt a session destroy
session.Destroy(id, nil)
return
}
Flags: LockFlagValue,
}
}

View File

@ -250,3 +250,40 @@ func TestLock_Destroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestLock_Conflict(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
sema, err := c.SemaphorePrefix("test/lock/", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not hold")
}
defer sema.Release()
lock, err := c.LockKey("test/lock/.lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should conflict with semaphore
_, err = lock.Lock(nil)
if err != ErrLockConflict {
t.Fatalf("err: %v", err)
}
// Should conflict with semaphore
err = lock.Destroy()
if err != ErrLockConflict {
t.Fatalf("err: %v", err)
}
}

View File

@ -30,6 +30,11 @@ const (
// DefaultSemaphoreKey is the key used within the prefix to
// use for coordination between all the contenders.
DefaultSemaphoreKey = ".lock"
// SemaphoreFlagValue is a magic flag we set to indicate a key
// is being used for a semaphore. It is used to detect a potential
// conflict with a lock.
SemaphoreFlagValue = 0xe0f69a2baa414de0
)
var (
@ -43,6 +48,10 @@ var (
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
// that is in use.
ErrSemaphoreInUse = fmt.Errorf("Semaphore in use")
// ErrSemaphoreConflict is returned if the flags on a key
// used for a semaphore do not match expectation
ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use")
)
// Semaphore is used to implement a distributed semaphore
@ -128,7 +137,7 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the session being lost.
func (s *Semaphore) Acquire(stopCh chan struct{}) (chan struct{}, error) {
func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Hold the lock as we try to acquire
s.l.Lock()
defer s.l.Unlock()
@ -146,7 +155,8 @@ func (s *Semaphore) Acquire(stopCh chan struct{}) (chan struct{}, error) {
} else {
s.sessionRenew = make(chan struct{})
s.lockSession = sess
go s.renewSession(sess, s.sessionRenew)
session := s.c.Session()
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
@ -186,6 +196,9 @@ WAIT:
// Decode the lock
lockPair := s.findLock(pairs)
if lockPair.Flags != SemaphoreFlagValue {
return nil, ErrSemaphoreConflict
}
lock, err := s.decodeLock(lockPair)
if err != nil {
return nil, err
@ -328,6 +341,9 @@ func (s *Semaphore) Destroy() error {
if lockPair.ModifyIndex == 0 {
return nil
}
if lockPair.Flags != SemaphoreFlagValue {
return ErrSemaphoreConflict
}
// Decode the lock
lock, err := s.decodeLock(lockPair)
@ -369,36 +385,13 @@ func (s *Semaphore) createSession() (string, error) {
return id, nil
}
// renewSession is a long running routine that maintians a session
// by doing a periodic Session renewal.
func (s *Semaphore) renewSession(id string, doneCh chan struct{}) {
session := s.c.Session()
ttl, _ := time.ParseDuration(s.opts.SessionTTL)
for {
select {
case <-time.After(ttl / 2):
entry, _, err := session.Renew(id, nil)
if err != nil || entry == nil {
return
}
// Handle the server updating the TTL
ttl, _ = time.ParseDuration(entry.TTL)
case <-doneCh:
// Attempt a session destroy
session.Destroy(id, nil)
return
}
}
}
// contenderEntry returns a formatted KVPair for the contender
func (s *Semaphore) contenderEntry(session string) *KVPair {
return &KVPair{
Key: path.Join(s.opts.Prefix, session),
Value: s.opts.Value,
Session: session,
Flags: SemaphoreFlagValue,
}
}
@ -410,7 +403,7 @@ func (s *Semaphore) findLock(pairs KVPairs) *KVPair {
return pair
}
}
return &KVPair{}
return &KVPair{Flags: SemaphoreFlagValue}
}
// decodeLock is used to decode a semaphoreLock from an
@ -441,6 +434,7 @@ func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, erro
pair := &KVPair{
Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey),
Value: enc,
Flags: SemaphoreFlagValue,
ModifyIndex: oldIndex,
}
return pair, nil

View File

@ -267,3 +267,40 @@ func TestSemaphore_Destroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestSemaphore_Conflict(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
lock, err := c.LockKey("test/sema/.lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
sema, err := c.SemaphorePrefix("test/sema/", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should conflict with lock
_, err = sema.Acquire(nil)
if err != ErrSemaphoreConflict {
t.Fatalf("err: %v", err)
}
// Should conflict with lock
err = sema.Destroy()
if err != ErrSemaphoreConflict {
t.Fatalf("err: %v", err)
}
}

View File

@ -147,6 +147,36 @@ func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta,
return nil, wm, nil
}
// RenewPeriodic is used to periodically invoke Session.Renew on a
// session until a doneCh is closed. This is meant to be used in a long running
// goroutine to ensure a session stays valid.
func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error {
ttl, err := time.ParseDuration(initialTTL)
if err != nil {
return err
}
for {
select {
case <-time.After(ttl / 2):
entry, _, err := s.Renew(id, q)
if err != nil {
return err
}
if entry == nil {
return nil
}
// Handle the server updating the TTL
ttl, _ = time.ParseDuration(entry.TTL)
case <-doneCh:
// Attempt a session destroy
s.Destroy(id, q)
return nil
}
}
}
// Info looks up a single session
func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) {
r := s.c.newRequest("GET", "/v1/session/info/"+id)

346
command/lock.go Normal file
View File

@ -0,0 +1,346 @@
package command
import (
"flag"
"fmt"
"os"
"path"
"strings"
"sync"
"syscall"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/agent"
"github.com/mitchellh/cli"
)
const (
// lockKillGracePeriod is how long we allow a child between
// a SIGTERM and a SIGKILL. This is to let the child cleanup
// any necessary state. We have to balance this with the risk
// of a split-brain where multiple children may be acting as if
// they hold a lock. This value is currently based on the default
// lock-delay value of 15 seconds. This only affects locks and not
// semaphores.
lockKillGracePeriod = 5 * time.Second
)
// LockCommand is a Command implementation that is used to setup
// a "lock" which manages lock acquasition and invokes a sub-process
type LockCommand struct {
ShutdownCh <-chan struct{}
Ui cli.Ui
child *os.Process
childLock sync.Mutex
verbose bool
}
func (c *LockCommand) Help() string {
helpText := `
Usage: consul lock [options] prefix child...
Acquires a lock or semaphore at a given path, and invokes a child
process when successful. The child process can assume the lock is
held while it executes. If the lock is lost or communication is disrupted
the child process will be sent a SIGTERM signal and given time to
gracefully exit. After the grace period expires the process will
be hard terminated.
When -n=1, only a single lock holder or leader exists providing
mutual exclusion. Setting a higher value switches to a semaphore
allowing multiple holders to coordinate.
The prefix provided must have write privileges.
Options:
-http-addr=127.0.0.1:8500 HTTP address of the Consul agent.
-n=1 Maximum number of allowed lock holders. If this
value is one, it operates as a lock, otherwise
a semaphore is used.
-name="" Optional name to associate with lock session.
-token="" ACL token to use. Defaults to that of agent.
-verbose Enables verbose output
`
return strings.TrimSpace(helpText)
}
func (c *LockCommand) Run(args []string) int {
var name, token string
var limit int
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.IntVar(&limit, "n", 1, "")
cmdFlags.StringVar(&name, "name", "", "")
cmdFlags.StringVar(&token, "token", "", "")
cmdFlags.BoolVar(&c.verbose, "verbose", false, "")
httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil {
return 1
}
// Check the limit
if limit <= 0 {
c.Ui.Error(fmt.Sprintf("Lock holder limit must be positive"))
return 1
}
// Verify the prefix and child are provided
extra := cmdFlags.Args()
if len(extra) < 2 {
c.Ui.Error("Key prefix and child command must be specified")
c.Ui.Error("")
c.Ui.Error(c.Help())
return 1
}
prefix := extra[0]
script := strings.Join(extra[1:], " ")
// Calculate a session name if none provided
if name == "" {
name = fmt.Sprintf("Consul lock for '%s' at '%s'", script, prefix)
}
// Create and test the HTTP client
conf := api.DefaultConfig()
conf.Address = *httpAddr
conf.Token = token
client, err := api.NewClient(conf)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
_, err = client.Agent().NodeName()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1
}
// Setup the lock or semaphore
var lu *LockUnlock
if limit == 1 {
lu, err = c.setupLock(client, prefix, name)
} else {
lu, err = c.setupSemaphore(client, limit, prefix, name)
}
if err != nil {
c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err))
return 1
}
// Attempt the acquisition
if c.verbose {
c.Ui.Info("Attempting lock acquisition")
}
lockCh, err := lu.lockFn(c.ShutdownCh)
if err != nil || lockCh == nil {
c.Ui.Error(fmt.Sprintf("Lock acquisition failed: %s", err))
return 1
}
// Start the child process
childDone := make(chan struct{})
go func() {
if err := c.startChild(script, childDone); err != nil {
c.Ui.Error(fmt.Sprintf("%s", err))
}
}()
// Monitor for shutdown, child termination, or lock loss
select {
case <-c.ShutdownCh:
if c.verbose {
c.Ui.Info("Shutdown triggered, killing child")
}
case <-lockCh:
if c.verbose {
c.Ui.Info("Lock lost, killing child")
}
case <-childDone:
if c.verbose {
c.Ui.Info("Child terminated, releasing lock")
}
goto RELEASE
}
// Kill the child
if err := c.killChild(childDone); err != nil {
c.Ui.Error(fmt.Sprintf("%s", err))
}
RELEASE:
// Release the lock before termination
if err := lu.unlockFn(); err != nil {
c.Ui.Error(fmt.Sprintf("Lock release failed: %s", err))
return 1
}
// Cleanup the lock if no longer in use
if err := lu.cleanupFn(); err != nil {
if err != lu.inUseErr {
c.Ui.Error(fmt.Sprintf("Lock cleanup failed: %s", err))
return 1
} else if c.verbose {
c.Ui.Info("Cleanup aborted, lock in use")
}
} else if c.verbose {
c.Ui.Info("Cleanup succeeded")
}
return 0
}
// setupLock is used to setup a new Lock given the API client,
// the key prefix to operate on, and an optional session name.
func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockUnlock, error) {
// Use the DefaultSemaphoreKey extention, this way if a lock and
// semaphore are both used at the same prefix, we will get a conflict
// which we can report to the user.
key := path.Join(prefix, api.DefaultSemaphoreKey)
if c.verbose {
c.Ui.Info(fmt.Sprintf("Setting up lock at path: %s", key))
}
opts := api.LockOptions{
Key: key,
SessionName: name,
}
l, err := client.LockOpts(&opts)
if err != nil {
return nil, err
}
lu := &LockUnlock{
lockFn: l.Lock,
unlockFn: l.Unlock,
cleanupFn: l.Destroy,
inUseErr: api.ErrLockInUse,
}
return lu, nil
}
// setupSemaphore is used to setup a new Semaphore given the
// API client, key prefix, session name, and slot holder limit.
func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string) (*LockUnlock, error) {
if c.verbose {
c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix))
}
opts := api.SemaphoreOptions{
Prefix: prefix,
Limit: limit,
SessionName: name,
}
s, err := client.SemaphoreOpts(&opts)
if err != nil {
return nil, err
}
lu := &LockUnlock{
lockFn: s.Acquire,
unlockFn: s.Release,
cleanupFn: s.Destroy,
inUseErr: api.ErrSemaphoreInUse,
}
return lu, nil
}
// startChild is a long running routine used to start and
// wait for the child process to exit.
func (c *LockCommand) startChild(script string, doneCh chan struct{}) error {
defer close(doneCh)
if c.verbose {
c.Ui.Info(fmt.Sprintf("Starting handler '%s'", script))
}
// Create the command
cmd, err := agent.ExecScript(script)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err))
return err
}
// Setup the command streams
cmd.Env = append(os.Environ(),
"CONSUL_LOCK_HELD=true",
)
cmd.Stdin = nil
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Start the child process
if err := cmd.Start(); err != nil {
c.Ui.Error(fmt.Sprintf("Error starting handler: %s", err))
return err
}
// Setup the child info
c.childLock.Lock()
c.child = cmd.Process
c.childLock.Unlock()
// Wait for the child process
if err := cmd.Wait(); err != nil {
c.Ui.Error(fmt.Sprintf("Error running handler: %s", err))
return err
}
return nil
}
// killChild is used to forcefully kill the child, first using SIGTERM
// to allow for a graceful cleanup and then using SIGKILL for a hard
// termination.
func (c *LockCommand) killChild(childDone chan struct{}) error {
// Get the child process
c.childLock.Lock()
child := c.child
c.childLock.Unlock()
// If there is no child process (failed to start), we can quit early
if child == nil {
if c.verbose {
c.Ui.Info("No child process to kill")
}
return nil
}
// Attempt a SIGTERM first
if c.verbose {
c.Ui.Info(fmt.Sprintf("Sending SIGTERM to child pid %d", child.Pid))
}
if err := syscall.Kill(child.Pid, syscall.SIGTERM); err != nil {
return fmt.Errorf("Failed to terminate %d: %v", child.Pid, err)
}
// Wait for termination, or until a timeout
select {
case <-childDone:
if c.verbose {
c.Ui.Info("Child exited after SIGTERM")
}
return nil
case <-time.After(lockKillGracePeriod):
if c.verbose {
c.Ui.Info(fmt.Sprintf("Child did not exit after grace period of %v",
lockKillGracePeriod))
}
}
// Send a final SIGKILL first
if c.verbose {
c.Ui.Info(fmt.Sprintf("Sending SIGKILL to child pid %d", child.Pid))
}
if err := syscall.Kill(child.Pid, syscall.SIGKILL); err != nil {
return fmt.Errorf("Failed to kill %d: %v", child.Pid, err)
}
return nil
}
func (c *LockCommand) Synopsis() string {
return "Execute a command holding a lock"
}
// LockUnlock is used to abstract over the differences between
// a lock and a semaphore.
type LockUnlock struct {
lockFn func(<-chan struct{}) (<-chan struct{}, error)
unlockFn func() error
cleanupFn func() error
inUseErr error
}

37
command/lock_test.go Normal file
View File

@ -0,0 +1,37 @@
package command
import (
"fmt"
"io/ioutil"
"path/filepath"
"testing"
"github.com/mitchellh/cli"
)
func TestLockCommand_implements(t *testing.T) {
var _ cli.Command = &LockCommand{}
}
func TestLockCommandRun(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
filePath := filepath.Join(a1.dir, "test_touch")
touchCmd := fmt.Sprintf("touch '%s'", filePath)
args := []string{"-http-addr=" + a1.httpAddr, "test/prefix", touchCmd}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
// Check for the file
_, err := ioutil.ReadFile(filePath)
if err != nil {
t.Fatalf("err: %v", err)
}
}

View File

@ -69,6 +69,13 @@ func init() {
}, nil
},
"lock": func() (cli.Command, error) {
return &command.LockCommand{
ShutdownCh: makeShutdownCh(),
Ui: ui,
}, nil
},
"members": func() (cli.Command, error) {
return &command.MembersCommand{
Ui: ui,

View File

@ -33,7 +33,9 @@ Available commands are:
info Provides debugging information for operators
join Tell Consul agent to join cluster
keygen Generates a new encryption key
keyring Manages gossip layer encryption keys
leave Gracefully leaves the Consul cluster and shuts down
lock Execute a command holding a lock
members Lists the members of a Consul cluster
monitor Stream logs from a Consul agent
reload Triggers the agent to reload configuration files

View File

@ -0,0 +1,61 @@
---
layout: "docs"
page_title: "Commands: Lock"
sidebar_current: "docs-commands-lock"
description: |-
The lock command provides a mechanism for leader election, mutual exclusion,
or worker pools. For example, this can be used to ensure a maximum number of
services running at once across a cluster.
---
# Consul Lock
Command: `consul lock`
The `lock` command provides a mechanism for simple distributed locking.
A lock (or semaphore) is created at a given prefix in the Key/Value store,
and only when held, is a child process invoked. If the lock is lost or
communication disrupted, the child process is terminated.A
The number of lock holder is configurable with the `-n` flag. By default,
a single holder is allowed, and a lock is used for mutual exclusion. This
uses the [leader election algorithm](/docs/guides/leader-election.html).
If the lock holder count is more than one, then a semaphore is used instead.
A semaphore allows more than a single holder, but the is less efficient than
a simple lock. This follows the [semaphore algorithm](/docs/guides/semaphore.html).
An example use case is for highly-available N+1 deployments. In these
cases, if N instances of a service are required, N+1 are deployed and use
consul lock with `-n=N` to ensure only N instances are running. For singleton
services, a hot standby waits until the current leader fails to take over.
## Usage
Usage: `consul lock [options] prefix child...`
The only required options are the key prefix and the command to execute.
The prefix must be writable. The child is invoked only when the lock is held,
and the `CONSUL_LOCK_HELD` environment variable will be set to `true`.
If the lock is lost, communication disrupted, or the parent process interrupted,
the child process will receive a `SIGTERM`. After a grace period, a `SIGKILL`
will be used to force termination.
The list of available flags are:
* `-http-addr` - Address to the HTTP server of the agent you want to contact
to send this command. If this isn't specified, the command will contact
"127.0.0.1:8500" which is the default HTTP address of a Consul agent.
* `-n` - Optional, limit of lock holders. Defaults to 1. The underlying
implementation switches from a lock to a semaphore when increased past
one.
* `-name` - Optional name to associate with the underlying session.
If not provided, one is generated based on the child command.
* `-token` - ACL token to use. Defaults to that of agent.
* `-verbose` - Enables verbose output.

View File

@ -27,3 +27,5 @@ The following guides are available:
* [Multiple Datacenters](/docs/guides/datacenters.html) - Configuring Consul to support multiple datacenters.
* [Outage Recovery](/docs/guides/outage.html) - This guide covers recovering a cluster that has become unavailable due to server failures.
* [Semaphore](/docs/guides/semaphore.html) - This guide covers using the Key/Value store to implement a semaphore.

View File

@ -0,0 +1,133 @@
---
layout: "docs"
page_title: "Semaphore"
sidebar_current: "docs-guides-semaphore"
description: |-
This guide demonstrates how to implement a distributed semaphore using the Consul Key/Value store.
---
# Semaphore
The goal of this guide is to cover how to build a client-side semaphore using Consul.
This is useful when you want to coordinate many services while restricting access to
certain resources.
If you only need mutual exclusion or leader election, [this guide](/docs/guides/leader-election.html)
provides a simpler algorithm that can be used instead.
There are a number of ways that a semaphore can be built, so our goal is not to
cover all the possible methods. Instead, we will focus on using Consul's support for
[sessions](/docs/internals/sessions.html), which allow us to build a system that can
gracefully handle failures.
Note that JSON output in this guide has been pretty-printed for easier
reading. Actual values returned from the API will not be formatted.
## Contending Nodes
The primary flow is for nodes who are attempting to acquire a slot in the semaphore.
All nodes that are participating should agree on a given prefix being used to coordinate,
a single lock key, and a limit of slot holders. A good choice is simply:
```text
service/<service name>/lock/
```
We will refer to this as just `<prefix>` for simplicity.
The first step is to create a session. This is done using the [/v1/session/create endpoint][session-api]:
[session-api]: http://www.consul.io/docs/agent/http.html#_v1_session_create
```text
curl -X PUT -d '{"Name": "dbservice"}' \
http://localhost:8500/v1/session/create
```
This will return a JSON object contain the session ID:
```text
{
"ID": "4ca8e74b-6350-7587-addf-a18084928f3c"
}
```
The session by default makes use of only the gossip failure detector. Additional checks
can be specified if desired.
Next, we create a contender entry. Each contender makes an entry that is tied
to a session. This is done so that if a contender is holding a slot and fails
it can be detected by the other contenders. Optionally, an opaque value
can be associated with the contender via a `<body>`.
Create the contender key by doing an `acquire` on `<prefix>/<session>` by doing a `PUT`.
This is something like:
```text
curl -X PUT -d <body> http://localhost:8500/v1/kv/<prefix>/<session>?acquire=<session>
```
Where `<session>` is the ID returned by the call to `/v1/session/create`.
This will either return `true` or `false`. If `true` is returned, the contender
entry has been created. If `false` is returned, the contender node was not created and
likely this indicates a session invalidation.
The next step is to use a single key to coordinate which holders are currently
reserving a slot. A good choice is simply `<prefix>/.lock`. We will refer to this
special coordinating key as `<lock>`. The current state of the semaphore is read by
doing a `GET` on the entire `<prefix>`:
```text
curl http://localhost:8500/v1/kv/<prefix>?recurse
```
Within the list of the entries, we should find the `<lock>`. That entry should hold
both the slot limit and the current holders. A simple JSON body like the following works:
```text
{
"Limit": 3,
"Holders": {
"4ca8e74b-6350-7587-addf-a18084928f3c": true,
"adf4238a-882b-9ddc-4a9d-5b6758e4159e": true
}
}
```
When the `<lock>` is read, we can verify the remote `Limit` agrees with the local value. This
is used to detect a potential conflict. The next step is to determine which of the current
slot holders are still alive. As part of the results of the `GET`, we have all the contender
entries. By scanning those entries, we create a set of all the `Session` values. Any of the
`Holders` that are not in that set are pruned. In effect, we are creating a set of live contenders
based on the list results, and doing a set difference with the `Holders` to detect and prune
any potentially failed holders.
If the number of holders (after pruning) is less than the limit, a contender attempts acquisition
by adding its own session to the `Holders` and doing a Check-And-Set update of the `<lock>`. This
performs an optimistic update.
This is done by:
```text
curl -X PUT -d <Updated Lock> http://localhost:8500/v1/kv/<lock>?cas=<lock-modify-index>
```
If this suceeds with `true` the condenter now holds a slot in the semaphore. If this fails
with `false`, then likely there was a race with another contender to acquire the slot.
Both code paths now go into an idle waiting state. In this state, we watch for changes
on `<prefix>`. This is because a slot may be released, a node may fail, etc.
Slot holders must also watch for changes since the slot may be released by an operator,
or automatically released due to a false positive in the failure detector.
Watching for changes is done by doing a blocking query against `<prefix>`. If a contender
holds a slot, then on any change the `<lock>` should be re-checked to ensure the slot is
still held. If no slot is held, then the same acquisition logic is triggered to check
and potentially re-attempt acquisition. This allows a contender to steal the slot from
a failed contender or one that has voluntarily released its slot.
If a slot holder ever wishes to release voluntarily, this should be done by doing a
Check-And-Set operation against `<lock>` to remove its session from the `Holders`. Once
that is done, the contender entry at `<prefix>/<session>` should be delete. Finally the
session should be destroyed.

View File

@ -87,6 +87,10 @@
<a href="/docs/commands/leave.html">leave</a>
</li>
<li<%= sidebar_current("docs-commands-lock") %>>
<a href="/docs/commands/lock.html">lock</a>
</li>
<li<%= sidebar_current("docs-commands-members") %>>
<a href="/docs/commands/members.html">members</a>
</li>
@ -187,6 +191,10 @@
<li<%= sidebar_current("docs-guides-outage") %>>
<a href="/docs/guides/outage.html">Outage Recovery</a>
</li>
<li<%= sidebar_current("docs-guides-semaphore") %>>
<a href="/docs/guides/semaphore.html">Semaphore</a>
</li>
</ul>
<li<%= sidebar_current("docs-faq") %>>